This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch feature/v1.1
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git


The following commit(s) were added to refs/heads/feature/v1.1 by this push:
     new 6f5754a  Include support for history-only monitoring streams
6f5754a is described below

commit 6f5754a498857cf3f7bb0a34ccadd4792a89d673
Author: Tim Ward <[email protected]>
AuthorDate: Wed Sep 13 12:38:09 2023 +0100

    Include support for history-only monitoring streams
    
    Signed-off-by: Tim Ward <[email protected]>
---
 .../typedevent/bus/impl/TypedEventMonitorImpl.java |  33 ++++--
 .../org/osgi/service/typedevent/TypedEventBus.java |   3 +
 .../service/typedevent/TypedEventPublisher.java    |   2 +
 .../typedevent/monitor/TypedEventMonitor.java      |  35 +++++-
 .../bus/osgi/TypedEventMonitorIntegrationTest.java | 118 +++++++++++++++++++++
 5 files changed, 182 insertions(+), 9 deletions(-)

diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
index 1d177fe..0e2d0b7 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.java
@@ -100,7 +100,12 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
 
     @Override
     public PushStream<MonitorEvent> monitorEvents(int history) {
-        return psp.buildStream(eventSource(history))
+       return monitorEvents(history, false);
+    }
+    
+    @Override
+    public PushStream<MonitorEvent> monitorEvents(int history, boolean 
historyOnly) {
+        return psp.buildStream(eventSource(history, historyOnly))
                 .withBuffer(new ArrayBlockingQueue<>(Math.max(historySize, 
history)))
                 .withPushbackPolicy(PushbackPolicyOption.FIXED, 
0).withQueuePolicy(QueuePolicyOption.FAIL)
                 .withExecutor(monitoringWorker).build();
@@ -108,12 +113,17 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
 
     @Override
     public PushStream<MonitorEvent> monitorEvents(Instant history) {
-        return psp.buildStream(eventSource(history)).withBuffer(new 
ArrayBlockingQueue<>(1024))
+       return monitorEvents(history, false);
+    }
+    
+    @Override
+    public PushStream<MonitorEvent> monitorEvents(Instant history, boolean 
historyOnly) {
+        return psp.buildStream(eventSource(history, 
historyOnly)).withBuffer(new ArrayBlockingQueue<>(1024))
                 .withPushbackPolicy(PushbackPolicyOption.FIXED, 
0).withQueuePolicy(QueuePolicyOption.FAIL)
                 .withExecutor(monitoringWorker).build();
     }
 
-    PushEventSource<MonitorEvent> eventSource(int events) {
+    PushEventSource<MonitorEvent> eventSource(int events, boolean historyOnly) 
{
 
         return pec -> {
                List<MonitorEvent> list;
@@ -125,11 +135,11 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
                } finally {
                        lock.readLock().unlock();
                }
-               return pushBackwards(pec, list);
+               return pushBackwards(pec, list, historyOnly);
         };
     }
 
-    PushEventSource<MonitorEvent> eventSource(Instant since) {
+    PushEventSource<MonitorEvent> eventSource(Instant since, boolean 
historyOnly) {
 
         return pec -> {
                List<MonitorEvent> list = new ArrayList<>();
@@ -147,11 +157,11 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
                } finally {
                        lock.readLock().unlock();
                }
-               return pushBackwards(pec, list);
+               return pushBackwards(pec, list, historyOnly);
         };
     }
 
-       private AutoCloseable pushBackwards(PushEventConsumer<? super 
MonitorEvent> pec, List<MonitorEvent> list)
+       private AutoCloseable pushBackwards(PushEventConsumer<? super 
MonitorEvent> pec, List<MonitorEvent> list, boolean historyOnly)
                        throws Exception {
                ListIterator<MonitorEvent> li = list.listIterator(list.size());
                while (li.hasPrevious()) {
@@ -165,7 +175,14 @@ public class TypedEventMonitorImpl implements 
TypedEventMonitor {
                                };
                        }
                }
-               return source.open(pec);
+               if(historyOnly) {
+                       try {
+                       pec.accept(PushEvent.close());
+                       } catch (Exception e) {}
+                       return () -> {};
+               } else {
+                       return source.open(pec);
+               }
        }
 
     <T> T copyOfHistory(Function<Stream<MonitorEvent>, T> events) {
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
index a73d508..ec89cbe 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventBus.java
@@ -84,6 +84,7 @@ public interface TypedEventBus {
         *            {@link TypedEventPublisher}
         * @return A {@link TypedEventPublisher} that will publish events to the
         *         topic name automatically generated from {@code eventType}
+        * @since 1.1
         */
        <T> TypedEventPublisher<T> createPublisher(Class<T> eventType);
 
@@ -98,6 +99,7 @@ public interface TypedEventBus {
         *            {@link TypedEventPublisher}
         * @return A {@link TypedEventPublisher} that will publish events to the
         *         supplied topic
+        * @since 1.1
         */
        <T> TypedEventPublisher<T> createPublisher(String topic,
                        Class<T> eventType);
@@ -108,6 +110,7 @@ public interface TypedEventBus {
         * @param topic The topic to publish events to
         * @return A {@link TypedEventPublisher} that will publish events to the
         *         supplied topic
+        * @since 1.1
         */
        TypedEventPublisher<Object> createPublisher(String topic);
 
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
index f7c7b90..fc306e3 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/TypedEventPublisher.java
@@ -35,7 +35,9 @@ import org.osgi.annotation.versioning.ProviderType;
  * @ThreadSafe
  * @author $Id: cace0c5ed2b2e0d6abdf96f4a30e24b9c1610eed $
  * @param <T> The type of events that may be sent using this publisher
+ * @since 1.1
  */
+
 @ProviderType
 public interface TypedEventPublisher<T> extends AutoCloseable {
        /**
diff --git 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
index a366b24..37a0372 100644
--- 
a/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
+++ 
b/org.apache.aries.typedevent.bus/src/main/java/org/osgi/service/typedevent/monitor/TypedEventMonitor.java
@@ -42,6 +42,8 @@ public interface TypedEventMonitor {
        /**
         * Get a stream of events, including up to the requested number of
         * historical data events.
+        * <p>
+        * Logically equivalent to <code>monitorEvents(history, false)</code>.
         *
         * @param history The requested number of historical events, note that 
fewer
         *            than this number of events may be returned if history is
@@ -50,9 +52,25 @@ public interface TypedEventMonitor {
         */
        PushStream<MonitorEvent> monitorEvents(int history);
 
+       /**
+        * Get a stream of events, including up to the requested number of
+        * historical data events.
+        *
+        * @param history The requested number of historical events, note that 
fewer
+        *            than this number of events may be returned if history is
+        *            unavailable, or if insufficient events have been sent.
+        * @param historyOnly If <code>true</code> then the returned stream 
will be
+        *            closed as soon as the available history has been delivered
+        * @return A stream of event data
+        * @since 1.1
+        */
+       PushStream<MonitorEvent> monitorEvents(int history, boolean 
historyOnly);
+
        /**
         * Get a stream of events, including historical data events prior to the
-        * supplied time
+        * supplied time.
+        * <p>
+        * Logically equivalent to <code>monitorEvents(history, false)</code>.
         *
         * @param history The requested time after which historical events, 
should
         *            be included. Note that events may have been discarded, or
@@ -61,4 +79,19 @@ public interface TypedEventMonitor {
         */
        PushStream<MonitorEvent> monitorEvents(Instant history);
 
+       /**
+        * Get a stream of events, including historical data events prior to the
+        * supplied time.
+        *
+        * @param history The requested time after which historical events, 
should
+        *            be included. Note that events may have been discarded, or
+        *            history unavailable.
+        * @param historyOnly If <code>true</code> then the returned stream 
will be
+        *            closed as soon as the available history has been delivered
+        * @return A stream of event data
+        * @since 1.1
+        */
+       PushStream<MonitorEvent> monitorEvents(Instant history,
+                       boolean historyOnly);
+
 }
diff --git 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
index ee630df..8632823 100644
--- 
a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
+++ 
b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
@@ -226,6 +226,58 @@ public class TypedEventMonitorIntegrationTest extends 
AbstractIntegrationTest {
         assertEquals("bam", events.get(0).eventData.get("message"));
 
 
+    }
+
+    /**
+     * Tests that event history is delivered to the monitor and it
+     * closes the stream after
+     *
+     * @throws InterruptedException
+     * @throws InvocationTargetException
+     */
+    @Test
+    public void testTypedEventMonitorHistory1Close(@InjectService 
TypedEventMonitor monitor, 
+               @InjectService TypedEventBus eventBus) throws 
InterruptedException, InvocationTargetException {
+       
+       TestEvent event = new TestEvent();
+       event.message = "boo";
+       
+       eventBus.deliver(event);
+       
+       event = new TestEvent();
+       event.message = "bam";
+       
+       eventBus.deliver(event);
+       
+       Thread.sleep(500);
+       
+       Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents(5, 
true)
+                       .collect(Collectors.toList())
+                       .timeout(2000);
+       
+       List<MonitorEvent> events = eventsPromise.getValue();
+       
+       assertEquals(2, events.size(), events.toString());
+       
+       assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+       assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+       
+       assertEquals("boo", events.get(0).eventData.get("message"));
+       assertEquals("bam", events.get(1).eventData.get("message"));
+       
+       eventsPromise = monitor.monitorEvents(1, true)
+                       .collect(Collectors.toList())
+                       .timeout(2000);
+       
+       events = eventsPromise.getValue();
+       
+       assertEquals(1, events.size());
+       
+       assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+       
+       assertEquals("bam", events.get(0).eventData.get("message"));
+       
+       
     }
 
     /**
@@ -306,4 +358,70 @@ public class TypedEventMonitorIntegrationTest extends 
AbstractIntegrationTest {
         assertTrue(events.isEmpty());
     }
 
+    /**
+     * Tests that event history is delivered to the monitor and it closes after
+     *
+     * @throws InterruptedException
+     * @throws InvocationTargetException
+     */
+    @Test
+    public void testTypedEventMonitorHistory2Close(@InjectService 
TypedEventMonitor monitor, 
+               @InjectService TypedEventBus eventBus) throws 
InterruptedException, InvocationTargetException {
+       
+       Instant beforeFirst = Instant.now().minus(Duration.ofMillis(500));
+       
+       TestEvent event = new TestEvent();
+       event.message = "boo";
+       
+       eventBus.deliver(event);
+       
+       Instant afterFirst = Instant.now().plus(Duration.ofMillis(500));
+       
+       Thread.sleep(1000);
+       
+       event = new TestEvent();
+       event.message = "bam";
+       
+       eventBus.deliver(event);
+       
+       Instant afterSecond = Instant.now().plus(Duration.ofMillis(500));
+       
+       Thread.sleep(600);
+       
+       // No stream time limit, this stream should auto-close
+       Promise<List<MonitorEvent>> eventsPromise = 
monitor.monitorEvents(beforeFirst, true)
+                       .collect(Collectors.toList())
+                       .timeout(2000);
+       
+       List<MonitorEvent> events = eventsPromise.getValue();
+       
+       assertEquals(2, events.size());
+       
+       assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+       assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+       
+       assertEquals("boo", events.get(0).eventData.get("message"));
+       assertEquals("bam", events.get(1).eventData.get("message"));
+       
+       eventsPromise = monitor.monitorEvents(afterFirst, true)
+                       .collect(Collectors.toList())
+                       .timeout(2000);
+       
+       events = eventsPromise.getValue();
+       
+       assertEquals(1, events.size());
+       
+       assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+       
+       assertEquals("bam", events.get(0).eventData.get("message"));
+       
+       eventsPromise = monitor.monitorEvents(afterSecond, true)
+                       .collect(Collectors.toList())
+                       .timeout(2000);
+       
+       events = eventsPromise.getValue();
+       
+       assertTrue(events.isEmpty());
+    }
+
 }

Reply via email to