Author: timothyjward
Date: Fri Oct 21 15:10:51 2016
New Revision: 1766040
URL: http://svn.apache.org/viewvc?rev=1766040&view=rev
Log:
[pushstream] Initial contribution of Push Streams
Added:
aries/trunk/pushstream/
aries/trunk/pushstream/README.md
aries/trunk/pushstream/pom.xml
aries/trunk/pushstream/pushstream/
aries/trunk/pushstream/pushstream/bnd.bnd
aries/trunk/pushstream/pushstream/pom.xml
aries/trunk/pushstream/pushstream/src/
aries/trunk/pushstream/pushstream/src/main/
aries/trunk/pushstream/pushstream/src/main/java/
aries/trunk/pushstream/pushstream/src/main/java/org/
aries/trunk/pushstream/pushstream/src/main/java/org/apache/
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
Modified:
aries/trunk/pom.xml
Modified: aries/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/aries/trunk/pom.xml?rev=1766040&r1=1766039&r2=1766040&view=diff
==============================================================================
--- aries/trunk/pom.xml (original)
+++ aries/trunk/pom.xml Fri Oct 21 15:10:51 2016
@@ -59,6 +59,7 @@
<module>esa-maven-plugin</module>
<module>async</module>
<module>tx-control</module>
+ <module>pushstream</module>
</modules>
<build>
Added: aries/trunk/pushstream/README.md
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/README.md?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/README.md (added)
+++ aries/trunk/pushstream/README.md Fri Oct 21 15:10:51 2016
@@ -0,0 +1,9 @@
+Sample OSGi Pushstream implementation
+-------------------------------------
+
+This project is a prototype implementation of the OSGi Pushstream
specification.
+
+OSGi Push Streams (RFC-216
https://github.com/osgi/design/tree/master/rfcs/rfc0216) are an in-progress RFC
publicly available from the OSGi Alliance. They are also described in chapter
706 of the OSGi R7 Early Draft
https://osgi.org/download/osgi.cmpn-7.0.0-earlydraft1.pdf
+
+Given that the RFC is non-final the OSGi API declared in this project is
subject to change at any time up to its official release. Also the behaviour of
this implementation may not always be up-to-date with the latest wording in the
RFC. The project maintainers will, however try to keep pace with the RFC, and
to ensure that the implementations are compliant with any OSGi specifications
that result from the RFC.
+
Added: aries/trunk/pushstream/pom.xml
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pom.xml?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pom.xml (added)
+++ aries/trunk/pushstream/pom.xml Fri Oct 21 15:10:51 2016
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>17</version>
+ <relativePath />
+ </parent>
+
+ <groupId>org.apache.aries.pushstream</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <description>Apache Aries Push Stream Parent</description>
+ <scm>
+ <connection>
+
scm:svn:http://svn.apache.org/repos/asf/aries/trunk/pushstream
+ </connection>
+ <developerConnection>
+
scm:svn:https://svn.apache.org/repos/asf/aries/trunk/pushstream
+ </developerConnection>
+ <url>
+ http://svn.apache.org/viewvc/aries/trunk/pushstream
+ </url>
+ </scm>
+
+ <profiles>
+ <profile>
+ <id>jdk18</id>
+ <activation>
+ <jdk>1.8</jdk>
+ </activation>
+ <modules>
+ <module>pushstream</module>
+ </modules>
+ </profile>
+ </profiles>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.annotation</artifactId>
+ <version>6.0.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.aries.async</groupId>
+
<artifactId>org.apache.aries.async.promise.api</artifactId>
+ <version>1.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+
<artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+
<groupId>org.apache.maven.plugins</groupId>
+
<artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+
<useDefaultManifestFile>true</useDefaultManifestFile>
+ </configuration>
+ </plugin>
+ <plugin>
+
<groupId>org.apache.maven.plugins</groupId>
+
<artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+
<artifactId>bnd-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+
<id>default-bnd-process</id>
+ <goals>
+
<goal>bnd-process</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
Added: aries/trunk/pushstream/pushstream/bnd.bnd
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/bnd.bnd?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/bnd.bnd (added)
+++ aries/trunk/pushstream/pushstream/bnd.bnd Fri Oct 21 15:10:51 2016
@@ -0,0 +1 @@
+Export-Package: ${packages;VERSIONED}
Added: aries/trunk/pushstream/pushstream/pom.xml
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/pom.xml?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/pom.xml (added)
+++ aries/trunk/pushstream/pushstream/pom.xml Fri Oct 21 15:10:51 2016
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.pushstream</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.aries.pushstream</groupId>
+ <artifactId>pushstream</artifactId>
+ <name>Apache Aries Push Streams</name>
+ <version>0.0.1-SNAPSHOT</version>
+
+
+ <description>
+ This bundle contains the Apache Aries OSGi Push Stream implementation.
+ </description>
+
+ <scm>
+ <connection>
+
scm:svn:http://svn.apache.org/repos/asf/aries/trunk/pushstream/pushstream
+ </connection>
+ <developerConnection>
+
scm:svn:https://svn.apache.org/repos/asf/aries/trunk/pushstream/pushstream
+ </developerConnection>
+ <url>
+ http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream
+ </url>
+ </scm>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.annotation</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.aries.async</groupId>
+
<artifactId>org.apache.aries.async.promise.api</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,1374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+import static org.osgi.util.pushstream.PushEventConsumer.*;
+
+import java.time.Duration;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushEventSource;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamBuilder;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+public abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+
+ public static enum State {
+ BUILDING, STARTED, CLOSED
+ }
+
+ protected final PushStreamProvider
psp;
+
+ protected final Executor
defaultExecutor;
+ protected final ScheduledExecutorService
scheduler;
+
+ protected final AtomicReference<State> closed = new
AtomicReference<>(BUILDING);
+
+ protected final AtomicReference<PushEventConsumer<T>>
next = new AtomicReference<>();
+
+ protected final AtomicReference<Runnable> onCloseCallback = new
AtomicReference<>();
+ protected final AtomicReference<Consumer<? super Throwable>>
onErrorCallback = new AtomicReference<>();
+
+ protected abstract boolean begin();
+
+ protected AbstractPushStreamImpl(PushStreamProvider psp,
+ Executor executor, ScheduledExecutorService scheduler) {
+ this.psp = psp;
+ this.defaultExecutor = executor;
+ this.scheduler = scheduler;
+ }
+
+ protected long handleEvent(PushEvent< ? extends T> event) {
+ if(closed.get() != CLOSED) {
+ try {
+ if(event.isTerminal()) {
+ close(event.nodata());
+ return ABORT;
+ } else {
+ PushEventConsumer<T> consumer =
next.get();
+ long val;
+ if(consumer == null) {
+ //TODO log a warning
+ val = CONTINUE;
+ } else {
+ val = consumer.accept(event);
+ }
+ if(val < 0) {
+ close();
+ }
+ return val;
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ }
+ return ABORT;
+ }
+
+ @Override
+ public void close() {
+ close(PushEvent.close());
+ }
+
+ protected boolean close(PushEvent<T> event) {
+ if(!event.isTerminal()) {
+ throw new IllegalArgumentException("The event " + event
+ " is not a close event.");
+ }
+ if(closed.getAndSet(CLOSED) != CLOSED) {
+ PushEventConsumer<T> aec = next.getAndSet(null);
+ if(aec != null) {
+ try {
+ aec.accept(event);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ Runnable handler = onCloseCallback.getAndSet(null);
+ if(handler != null) {
+ try {
+ handler.run();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ if (event.getType() == EventType.ERROR) {
+ Consumer<? super Throwable> errorHandler =
onErrorCallback.getAndSet(null);
+ if(errorHandler != null) {
+ try {
+
errorHandler.accept(event.getFailure());
+ } catch (Exception e) {
+ // TODO Auto-generated catch
block
+ e.printStackTrace();
+ }
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public PushStream<T> onClose(Runnable closeHandler) {
+ if(onCloseCallback.compareAndSet(null, closeHandler)) {
+ if(closed.get() == State.CLOSED &&
onCloseCallback.compareAndSet(closeHandler, null)) {
+ closeHandler.run();
+ }
+ } else {
+ throw new IllegalStateException("A close handler has
already been defined for this stream object");
+ }
+ return this;
+ }
+
+ @Override
+ public PushStream<T> onError(Consumer< ? super Throwable> closeHandler)
{
+ if(onErrorCallback.compareAndSet(null, closeHandler)) {
+ if(closed.get() == State.CLOSED) {
+ //TODO log already closed
+ onErrorCallback.set(null);
+ }
+ } else {
+ throw new IllegalStateException("A close handler has
already been defined for this stream object");
+ }
+ return this;
+ }
+
+ private void updateNext(PushEventConsumer<T> consumer) {
+ if(!next.compareAndSet(null, consumer)) {
+ throw new IllegalStateException("This stream has
already been chained");
+ } else if(closed.get() == CLOSED &&
next.compareAndSet(consumer, null)) {
+ try {
+ consumer.accept(PushEvent.close());
+ } catch (Exception e) {
+ //TODO log
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public PushStream<T> filter(Predicate< ? super T> predicate) {
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext((event) -> {
+ try {
+ if (!event.isTerminal()) {
+ if (predicate.test(event.getData())) {
+ return
eventStream.handleEvent(event);
+ } else {
+ return CONTINUE;
+ }
+ }
+ return eventStream.handleEvent(event);
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
+
+ AbstractPushStreamImpl<R> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ return eventStream.handleEvent(
+
PushEvent.data(mapper.apply(event.getData())));
+ } else {
+ return
eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> flatMap(
+ Function< ? super T, ? extends PushStream< ? extends
R>> mapper) {
+ AbstractPushStreamImpl<R> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+
+ PushEventConsumer<R> consumer = e -> {
+ switch (e.getType()) {
+ case ERROR :
+ close(e.nodata());
+ return ABORT;
+ case CLOSE :
+ // Close should allow the next flat
mapped entry
+ // without closing the stream;
+ return ABORT;
+ case DATA :
+ long returnValue =
eventStream.handleEvent(e);
+ if (returnValue < 0) {
+ close();
+ return ABORT;
+ }
+ return returnValue;
+ default :
+ throw new IllegalArgumentException(
+ "The event type " +
e.getType() + " is unknown");
+ }
+ };
+
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ PushStream< ? extends R> mappedStream =
mapper
+ .apply(event.getData());
+
+ return
mappedStream.forEachEvent(consumer)
+ .getValue()
+ .longValue();
+ } else {
+ return
eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> distinct() {
+ Set<T> set = Collections.<T>newSetFromMap(new
ConcurrentHashMap<>());
+ return filter(set::add);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public PushStream<T> sorted() {
+ return sorted((Comparator)Comparator.naturalOrder());
+ }
+
+ @Override
+ public PushStream<T> sorted(Comparator< ? super T> comparator) {
+ List<T> list = Collections.synchronizedList(new ArrayList<>());
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA :
+ list.add(event.getData());
+ return CONTINUE;
+ case CLOSE :
+ list.sort(comparator);
+ for(T t : list) {
+
eventStream.handleEvent(PushEvent.data(t));
+ }
+ return ABORT;
+ case ERROR :
+ return
eventStream.handleEvent(event.nodata());
+ }
+ return eventStream.handleEvent(event.nodata());
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> limit(long maxSize) {
+ if(maxSize <= 0) {
+ throw new IllegalArgumentException("The limit must be
greater than zero");
+ }
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicLong counter = new AtomicLong(maxSize);
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ long count = counter.decrementAndGet();
+ if (count > 0) {
+ return
eventStream.handleEvent(event);
+ } else if (count == 0) {
+ eventStream.handleEvent(event);
+ }
+ return ABORT;
+ } else {
+ return
eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> skip(long n) {
+ if(n <= 0) {
+ throw new IllegalArgumentException("The number to skip
must be greater than zero");
+ }
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicLong counter = new AtomicLong(n);
+ updateNext(event -> {
+ try {
+ if (!event.isTerminal()) {
+ if (counter.get() > 0 &&
counter.decrementAndGet() >= 0) {
+ return CONTINUE;
+ } else {
+ return
eventStream.handleEvent(event);
+ }
+ } else {
+ return
eventStream.handleEvent(event.nodata());
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> fork(int n, int delay, Executor ex) {
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, ex, scheduler, this);
+ Semaphore s = new Semaphore(n);
+ updateNext(event -> {
+ try {
+ if (event.isTerminal()) {
+ s.acquire(n);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+
+ s.acquire(1);
+
+ ex.execute(() -> {
+ try {
+ if
(eventStream.handleEvent(event) < 0) {
+
eventStream.close(PushEvent.close());
+ }
+ } catch (Exception e1) {
+ close(PushEvent.error(e1));
+ } finally {
+ s.release(1);
+ }
+ });
+
+ return s.getQueueLength() * delay;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+
+ return eventStream;
+ }
+
+ @Override
+ public PushStream<T> buffer() {
+ return psp.createStream(c -> {
+ forEachEvent(c);
+ return this;
+ });
+ }
+
+ @Override
+ public <U extends BlockingQueue<PushEvent< ? extends T>>>
PushStreamBuilder<T,U> buildBuffer() {
+ return psp.buildStream(c -> {
+ forEachEvent(c);
+ return this;
+ });
+ }
+
+ @Override
+ public PushStream<T> merge(
+ PushEventSource< ? extends T> source) {
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicInteger count = new AtomicInteger(2);
+ PushEventConsumer<T> consumer = event -> {
+ try {
+ if (!event.isTerminal()) {
+ return eventStream.handleEvent(event);
+ }
+
+ if (count.decrementAndGet() == 0) {
+ eventStream.handleEvent(event.nodata());
+ return ABORT;
+ }
+ return CONTINUE;
+ } catch (Exception e) {
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+ };
+ updateNext(consumer);
+ AutoCloseable second;
+ try {
+ second = source.open((PushEvent< ? extends T> event) ->
{
+ return consumer.accept(event);
+ });
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new IllegalStateException(
+ "Unable to merge events as the event
source could not be opened.",
+ e);
+ }
+
+ return eventStream.onClose(() -> {
+ try {
+ second.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }).map(Function.identity());
+ }
+
+ @Override
+ public PushStream<T> merge(PushStream< ? extends T> source) {
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ AtomicInteger count = new AtomicInteger(2);
+ PushEventConsumer<T> consumer = event -> {
+ try {
+ if (!event.isTerminal()) {
+ return eventStream.handleEvent(event);
+ }
+
+ if (count.decrementAndGet() == 0) {
+ eventStream.handleEvent(event.nodata());
+ return ABORT;
+ }
+ return CONTINUE;
+ } catch (Exception e) {
+ PushEvent<T> error = PushEvent.error(e);
+ close(error);
+ eventStream.close(event.nodata());
+ return ABORT;
+ }
+ };
+ updateNext(consumer);
+ try {
+ source.forEachEvent(event -> {
+ return consumer.accept(event);
+ }).then(p -> {
+ count.decrementAndGet();
+ consumer.accept(PushEvent.close());
+ return null;
+ }, p -> {
+ count.decrementAndGet();
+ consumer.accept(PushEvent.error((Exception)
p.getFailure()));
+ });
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new IllegalStateException(
+ "Unable to merge events as the event
source could not be opened.",
+ e);
+ }
+
+ return eventStream.onClose(() -> {
+ try {
+ source.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }).map(Function.identity());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public PushStream<T>[] split(Predicate< ? super T>... predicates) {
+ Predicate<? super T>[] tests = Arrays.copyOf(predicates,
predicates.length);
+ AbstractPushStreamImpl<T>[] rsult = new
AbstractPushStreamImpl[tests.length];
+ for(int i = 0; i < tests.length; i++) {
+ rsult[i] = new IntermediatePushStreamImpl<>(psp,
defaultExecutor,
+ scheduler, this);
+ }
+ AtomicReferenceArray<Boolean> off = new
AtomicReferenceArray<>(tests.length);
+ AtomicInteger count = new AtomicInteger(tests.length);
+ updateNext(event -> {
+ if (!event.isTerminal()) {
+ long delay = CONTINUE;
+ for (int i = 0; i < tests.length; i++) {
+ try {
+ if (off.get(i).booleanValue()
+ &&
tests[i].test(event.getData())) {
+ long accept =
rsult[i].handleEvent(event);
+ if (accept < 0) {
+ off.set(i,
Boolean.TRUE);
+
count.decrementAndGet();
+ } else if (accept >
delay) {
+ accept = delay;
+ }
+ }
+ } catch (Exception e) {
+ try {
+
rsult[i].close(PushEvent.error(e));
+ } catch (Exception e2) {
+ //TODO log
+ }
+ off.set(i, Boolean.TRUE);
+ }
+ }
+ if (count.get() == 0)
+ return ABORT;
+
+ return delay;
+ }
+ for (AbstractPushStreamImpl<T> as : rsult) {
+ try {
+ as.handleEvent(event.nodata());
+ } catch (Exception e) {
+ try {
+ as.close(PushEvent.error(e));
+ } catch (Exception e2) {
+ //TODO log
+ }
+ }
+ }
+ return ABORT;
+ });
+ return Arrays.copyOf(rsult, tests.length);
+ }
+
+ @Override
+ public PushStream<T> sequential() {
+ AbstractPushStreamImpl<T> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ Lock lock = new ReentrantLock();
+ updateNext((event) -> {
+ try {
+ lock.lock();
+ try {
+ return eventStream.handleEvent(event);
+ } finally {
+ lock.unlock();
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> coalesce(
+ Function< ? super T,Optional<R>> accumulator) {
+ AbstractPushStreamImpl<R> eventStream = new
IntermediatePushStreamImpl<>(
+ psp, defaultExecutor, scheduler, this);
+ updateNext((event) -> {
+ try {
+ if (!event.isTerminal()) {
+ Optional<PushEvent<R>> coalesced =
accumulator
+
.apply(event.getData()).map(PushEvent::data);
+ if (coalesced.isPresent()) {
+ try {
+ return
eventStream.handleEvent(coalesced.get());
+ } catch (Exception ex) {
+
close(PushEvent.error(ex));
+ return ABORT;
+ }
+ } else {
+ return CONTINUE;
+ }
+ }
+ return eventStream.handleEvent(event.nodata());
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ @Override
+ public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R>
f) {
+ if (count <= 0)
+ throw new IllegalArgumentException(
+ "A coalesce operation must collect a
positive number of events");
+ // This could be optimised to only use a single collection
queue.
+ // It would save some GC, but is it worth it?
+ return coalesce(() -> count, f);
+ }
+
+ @Override
+ public <R> PushStream<R> coalesce(IntSupplier count,
+ Function<Collection<T>,R> f) {
+ AtomicReference<Queue<T>> queueRef = new
AtomicReference<Queue<T>>(
+ null);
+
+ Runnable init = () -> queueRef
+
.set(getQueueForInternalBuffering(count.getAsInt()));
+
+ @SuppressWarnings("resource")
+ AbstractPushStreamImpl<R> eventStream = new
IntermediatePushStreamImpl<R>(
+ psp, defaultExecutor, scheduler, this) {
+ @Override
+ protected void beginning() {
+ init.run();
+ }
+ };
+
+ AtomicBoolean endPending = new AtomicBoolean();
+ Object lock = new Object();
+ updateNext((event) -> {
+ try {
+ Queue<T> queue;
+ if (!event.isTerminal()) {
+ synchronized (lock) {
+ for (;;) {
+ queue = queueRef.get();
+ if (queue == null) {
+ if
(endPending.get()) {
+ return
ABORT;
+ } else {
+
continue;
+ }
+ } else if
(queue.offer(event.getData())) {
+ return CONTINUE;
+ } else {
+
queueRef.lazySet(null);
+ break;
+ }
+ }
+ }
+
+ queueRef.set(
+
getQueueForInternalBuffering(count.getAsInt()));
+
+ // This call is on the same thread and
so must happen
+ // outside
+ // the synchronized block.
+ return aggregateAndForward(f,
eventStream, event,
+ queue);
+ } else {
+ synchronized (lock) {
+ queue = queueRef.get();
+ queueRef.lazySet(null);
+ endPending.set(true);
+ }
+ if (queue != null) {
+ eventStream.handleEvent(
+
PushEvent.data(f.apply(queue)));
+ }
+ }
+ return eventStream.handleEvent(event.nodata());
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ private <R> long aggregateAndForward(Function<Collection<T>,R> f,
+ AbstractPushStreamImpl<R> eventStream,
+ PushEvent< ? extends T> event, Queue<T> queue) {
+ if (!queue.offer(event.getData())) {
+ ((ArrayQueue<T>) queue).forcePush(event.getData());
+ }
+ return eventStream.handleEvent(PushEvent.data(f.apply(queue)));
+ }
+
+
+ @Override
+ public <R> PushStream<R> window(Duration time,
+ Function<Collection<T>,R> f) {
+ return window(time, defaultExecutor, f);
+ }
+
+ @Override
+ public <R> PushStream<R> window(Duration time, Executor executor,
+ Function<Collection<T>,R> f) {
+ return window(() -> time, () -> 0, executor, (t, c) ->
f.apply(c));
+ }
+
+ @Override
+ public <R> PushStream<R> window(Supplier<Duration> time,
+ IntSupplier maxEvents,
+ BiFunction<Long,Collection<T>,R> f) {
+ return window(time, maxEvents, defaultExecutor, f);
+ }
+
+ @Override
+ public <R> PushStream<R> window(Supplier<Duration> time,
+ IntSupplier maxEvents, Executor ex,
+ BiFunction<Long,Collection<T>,R> f) {
+
+ AtomicLong timestamp = new AtomicLong();
+ AtomicLong counter = new AtomicLong();
+ Object lock = new Object();
+ AtomicReference<Queue<T>> queueRef = new
AtomicReference<Queue<T>>(
+ null);
+
+ // This code is declared as a separate block to avoid any
confusion
+ // about which instance's methods and variables are in scope
+ Consumer<AbstractPushStreamImpl<R>> begin = p -> {
+
+ synchronized (lock) {
+ timestamp.lazySet(System.nanoTime());
+ long count = counter.get();
+
+
+ scheduler.schedule(
+ getWindowTask(p, f, time,
maxEvents, lock, count,
+ queueRef,
timestamp, counter, ex),
+ time.get().toNanos(),
NANOSECONDS);
+ }
+
+
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+ };
+
+ @SuppressWarnings("resource")
+ AbstractPushStreamImpl<R> eventStream = new
IntermediatePushStreamImpl<R>(
+ psp, ex, scheduler, this) {
+ @Override
+ protected void beginning() {
+ begin.accept(this);
+ }
+ };
+
+ AtomicBoolean endPending = new AtomicBoolean(false);
+ updateNext((event) -> {
+ try {
+ if (eventStream.closed.get() == CLOSED) {
+ return ABORT;
+ }
+ Queue<T> queue;
+ if (!event.isTerminal()) {
+ long elapsed;
+ long newCount;
+ synchronized (lock) {
+ for (;;) {
+ queue = queueRef.get();
+ if (queue == null) {
+ if
(endPending.get()) {
+ return
ABORT;
+ } else {
+
continue;
+ }
+ } else if
(queue.offer(event.getData())) {
+ return CONTINUE;
+ } else {
+
queueRef.lazySet(null);
+ break;
+ }
+ }
+
+ long now = System.nanoTime();
+ elapsed = now - timestamp.get();
+ timestamp.lazySet(now);
+ newCount = counter.get() + 1;
+ counter.lazySet(newCount);
+
+ // This is a non-blocking call,
and must happen in the
+ // synchronized block to avoid
re=ordering the executor
+ // enqueue with a subsequent
incoming close operation
+ aggregateAndForward(f,
eventStream, event, queue,
+ ex, elapsed);
+ }
+ // These must happen outside the
synchronized block as we
+ // call out to user code
+ queueRef.set(
+
getQueueForInternalBuffering(maxEvents.getAsInt()));
+ scheduler.schedule(
+
getWindowTask(eventStream, f, time, maxEvents, lock,
+
newCount, queueRef, timestamp, counter, ex),
+ time.get().toNanos(),
NANOSECONDS);
+
+ return CONTINUE;
+ } else {
+ long elapsed;
+ synchronized (lock) {
+ queue = queueRef.get();
+ queueRef.lazySet(null);
+ endPending.set(true);
+ long now = System.nanoTime();
+ elapsed = now - timestamp.get();
+ counter.lazySet(counter.get() +
1);
+ }
+ Collection<T> collected = queue == null
? emptyList()
+ : queue;
+ ex.execute(() -> {
+ try {
+ eventStream
+
.handleEvent(PushEvent.data(f.apply(
+
Long.valueOf(NANOSECONDS
+
.toMillis(elapsed)),
+
collected)));
+ } catch (Exception e) {
+
close(PushEvent.error(e));
+ }
+ });
+ }
+ ex.execute(() ->
eventStream.handleEvent(event.nodata()));
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ return eventStream;
+ }
+
+ protected Queue<T> getQueueForInternalBuffering(int size) {
+ if (size == 0) {
+ return new LinkedList<T>();
+ } else {
+ return new ArrayQueue<>(size - 1);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ /**
+ * A special queue that keeps one element in reserve and can have that
last
+ * element set using forcePush. After the element is set the capacity is
+ * permanently increased by one and cannot grow further.
+ *
+ * @param <E> The element type
+ */
+ private static class ArrayQueue<E> extends AbstractQueue<E>
+ implements Queue<E> {
+
+ final Object[] store;
+
+ int normalLength;
+
+ int nextIndex;
+
+ int size;
+
+ ArrayQueue(int capacity) {
+ store = new Object[capacity + 1];
+ normalLength = store.length - 1;
+ }
+
+ @Override
+ public boolean offer(E e) {
+ if (e == null)
+ throw new NullPointerException("Null values are
not supported");
+ if (size < normalLength) {
+ store[nextIndex] = e;
+ size++;
+ nextIndex++;
+ nextIndex = nextIndex % normalLength;
+ return true;
+ }
+ return false;
+ }
+
+ public void forcePush(E e) {
+ store[normalLength] = e;
+ normalLength++;
+ size++;
+ }
+
+ @Override
+ public E poll() {
+ if (size == 0) {
+ return null;
+ } else {
+ int idx = nextIndex - size;
+ if (idx < 0) {
+ idx += normalLength;
+ }
+ E value = (E) store[idx];
+ store[idx] = null;
+ size--;
+ return value;
+ }
+ }
+
+ @Override
+ public E peek() {
+ if (size == 0) {
+ return null;
+ } else {
+ int idx = nextIndex - size;
+ if (idx < 0) {
+ idx += normalLength;
+ }
+ return (E) store[idx];
+ }
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ final int previousNext = nextIndex;
+ return new Iterator<E>() {
+
+ int idx;
+
+ int remaining = size;
+
+ {
+ idx = nextIndex - size;
+ if (idx < 0) {
+ idx += normalLength;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextIndex != previousNext) {
+ throw new
ConcurrentModificationException(
+ "The queue was
concurrently modified");
+ }
+ return remaining > 0;
+ }
+
+ @Override
+ public E next() {
+ if (!hasNext()) {
+ throw new
NoSuchElementException(
+ "The iterator
has no more values");
+ }
+ E value = (E) store[idx];
+ idx++;
+ remaining--;
+ if (idx == normalLength) {
+ idx = 0;
+ }
+ return value;
+ }
+
+ };
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ }
+
+ private <R> Runnable getWindowTask(AbstractPushStreamImpl<R>
eventStream,
+ BiFunction<Long,Collection<T>,R> f, Supplier<Duration>
time,
+ IntSupplier maxEvents, Object lock, long
expectedCounter,
+ AtomicReference<Queue<T>> queueRef, AtomicLong
timestamp,
+ AtomicLong counter, Executor executor) {
+ return () -> {
+
+ Queue<T> queue = null;
+ long elapsed;
+ synchronized (lock) {
+
+ if (counter.get() != expectedCounter) {
+ return;
+ }
+ counter.lazySet(expectedCounter + 1);
+
+ long now = System.nanoTime();
+ elapsed = now - timestamp.get();
+ timestamp.lazySet(now);
+
+ queue = queueRef.get();
+ queueRef.lazySet(null);
+
+ // This is a non-blocking call, and must happen
in the
+ // synchronized block to avoid re=ordering the
executor
+ // enqueue with a subsequent incoming close
operation
+
+ Collection<T> collected = queue == null ?
emptyList() : queue;
+ executor.execute(() -> {
+ try {
+
eventStream.handleEvent(PushEvent.data(f.apply(
+
Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+ collected)));
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ });
+ }
+
+ // These must happen outside the synchronized block as
we
+ // call out to user code
+
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+ scheduler.schedule(
+ getWindowTask(eventStream, f, time,
maxEvents, lock,
+ expectedCounter + 1,
queueRef, timestamp, counter,
+ executor),
+ time.get().toNanos(), NANOSECONDS);
+ };
+ }
+
+ private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f,
+ AbstractPushStreamImpl<R> eventStream,
+ PushEvent< ? extends T> event, Queue<T> queue, Executor
executor,
+ long elapsed) {
+ executor.execute(() -> {
+ try {
+ if (!queue.offer(event.getData())) {
+ ((ArrayQueue<T>)
queue).forcePush(event.getData());
+ }
+ long result =
eventStream.handleEvent(PushEvent.data(
+
f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+ queue)));
+ if (result < 0) {
+ close();
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ });
+ }
+
+ @Override
+ public Promise<Void> forEach(Consumer< ? super T> action) {
+ Deferred<Void> d = new Deferred<>();
+ updateNext((event) -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+
action.accept(event.getData());
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(null);
+ break;
+ case ERROR:
+
d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ d.fail(e);
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Object[]> toArray() {
+ return collect(Collectors.toList())
+ .map(List::toArray);
+ }
+
+ @Override
+ public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) {
+ return collect(Collectors.toList())
+ .map(l -> l.toArray(generator.apply(l.size())));
+ }
+
+ @Override
+ public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
+ Deferred<T> d = new Deferred<>();
+ AtomicReference<T> iden = new AtomicReference<T>(identity);
+
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+
iden.accumulateAndGet(event.getData(), accumulator);
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(iden.get());
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
+ Deferred<Optional<T>> d = new Deferred<>();
+ AtomicReference<T> iden = new AtomicReference<T>(null);
+
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ if (!iden.compareAndSet(null,
event.getData()))
+
iden.accumulateAndGet(event.getData(), accumulator);
+ return CONTINUE;
+ case CLOSE:
+
d.resolve(Optional.ofNullable(iden.get()));
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U>
accumulator, BinaryOperator<U> combiner) {
+ Deferred<U> d = new Deferred<>();
+ AtomicReference<U> iden = new AtomicReference<>(identity);
+
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ iden.updateAndGet((e) ->
accumulator.apply(e, event.getData()));
+ return CONTINUE;
+ case CLOSE:
+ d.resolve(iden.get());
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
+ A result = collector.supplier().get();
+ Deferred<R> d = new Deferred<>();
+ updateNext(event -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+
collector.accumulator().accept(result, event.getData());
+ return CONTINUE;
+ case CLOSE:
+
d.resolve(collector.finisher().apply(result));
+ break;
+ case ERROR:
+ d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Optional<T>> min(Comparator<? super T> comparator) {
+ return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b);
+ }
+
+ @Override
+ public Promise<Optional<T>> max(Comparator<? super T> comparator) {
+ return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b);
+ }
+
+ @Override
+ public Promise<Long> count() {
+ Deferred<Long> d = new Deferred<>();
+ LongAdder counter = new LongAdder();
+ updateNext((event) -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ counter.add(1);
+ return CONTINUE;
+ case CLOSE:
+
d.resolve(Long.valueOf(counter.sum()));
+ break;
+ case ERROR:
+
d.fail(event.getFailure());
+ break;
+ }
+ close(event.nodata());
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Boolean> anyMatch(Predicate<? super T> predicate) {
+ return filter(predicate).findAny()
+ .map(Optional::isPresent);
+ }
+
+ @Override
+ public Promise<Boolean> allMatch(Predicate<? super T> predicate) {
+ return filter(x -> !predicate.test(x)).findAny()
+ .map(o -> Boolean.valueOf(!o.isPresent()));
+ }
+
+ @Override
+ public Promise<Boolean> noneMatch(Predicate<? super T> predicate) {
+ return filter(predicate).findAny()
+ .map(o -> Boolean.valueOf(!o.isPresent()));
+ }
+
+ @Override
+ public Promise<Optional<T>> findFirst() {
+ Deferred<Optional<T>> d = new Deferred<>();
+ updateNext((event) -> {
+ try {
+ Optional<T> o = null;
+ switch(event.getType()) {
+ case DATA:
+ o =
Optional.of(event.getData());
+ break;
+ case CLOSE:
+ o = Optional.empty();
+ break;
+ case ERROR:
+
d.fail(event.getFailure());
+ return ABORT;
+ }
+ if(!d.getPromise().isDone())
+ d.resolve(o);
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+ @Override
+ public Promise<Optional<T>> findAny() {
+ return findFirst();
+ }
+
+ @Override
+ public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action)
{
+ Deferred<Long> d = new Deferred<>();
+ LongAdder la = new LongAdder();
+ updateNext((event) -> {
+ try {
+ switch(event.getType()) {
+ case DATA:
+ long value =
action.accept(event);
+ la.add(value);
+ return value;
+ case CLOSE:
+ try {
+ action.accept(event);
+ } finally {
+
d.resolve(Long.valueOf(la.sum()));
+ }
+ break;
+ case ERROR:
+ try {
+ action.accept(event);
+ } finally {
+
d.fail(event.getFailure());
+ }
+ break;
+ }
+ return ABORT;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ });
+ begin();
+ return d.getPromise();
+ }
+
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.PushbackPolicy;
+import org.osgi.util.pushstream.QueuePolicy;
+
+public class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ?
extends T>>>
+ extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
+
+ private final U eventQueue;
+
+ private final Semaphore semaphore;
+
+ private final Executor worker;
+
+ private final QueuePolicy<T, U> queuePolicy;
+
+ private final PushbackPolicy<T, U> pushbackPolicy;
+
+ /**
+ * Indicates that a terminal event has been received, that we should
stop
+ * collecting new events, and that we must drain the buffer before
+ * continuing
+ */
+ private final AtomicBoolean softClose = new
AtomicBoolean();
+
+ private final int parallelism;
+
+ public BufferedPushStreamImpl(PushStreamProvider psp,
+ ScheduledExecutorService scheduler, U eventQueue,
+ int parallelism, Executor worker, QueuePolicy<T,U>
queuePolicy,
+ PushbackPolicy<T,U> pushbackPolicy,
+ Function<PushEventConsumer<T>,AutoCloseable> connector)
{
+ super(psp, worker, scheduler, connector);
+ this.eventQueue = eventQueue;
+ this.parallelism = parallelism;
+ this.semaphore = new Semaphore(parallelism);
+ this.worker = worker;
+ this.queuePolicy = queuePolicy;
+ this.pushbackPolicy = pushbackPolicy;
+ }
+
+ @Override
+ protected long handleEvent(PushEvent< ? extends T> event) {
+
+ // If we have already been soft closed, or hard closed then
abort
+ if (!softClose.compareAndSet(false, event.isTerminal())
+ || closed.get() == CLOSED) {
+ return ABORT;
+ }
+
+ try {
+ queuePolicy.doOffer(eventQueue, event);
+ long backPressure = pushbackPolicy.pushback(eventQueue);
+ if(backPressure < 0) {
+ close();
+ return ABORT;
+ }
+ if(semaphore.tryAcquire()) {
+ startWorker();
+ }
+ return backPressure;
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ return ABORT;
+ }
+ }
+
+ private void startWorker() {
+ worker.execute(() -> {
+ try {
+ PushEvent< ? extends T> event;
+ while ((event = eventQueue.poll()) != null) {
+ if (event.isTerminal()) {
+ // Wait for the other threads
to finish
+ semaphore.acquire(parallelism -
1);
+ }
+
+ long backpressure =
super.handleEvent(event);
+ if(backpressure < 0) {
+ close();
+ return;
+ } else if(backpressure > 0) {
+
scheduler.schedule(this::startWorker, backpressure,
+ MILLISECONDS);
+ return;
+ }
+ }
+
+ semaphore.release();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ if(eventQueue.peek() != null && semaphore.tryAcquire())
{
+ try {
+ startWorker();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ }
+ });
+
+ }
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+
+public class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
+ implements PushStream<T> {
+
+ private final AbstractPushStreamImpl< ? > previous;
+
+ protected IntermediatePushStreamImpl(PushStreamProvider psp,
+ Executor executor, ScheduledExecutorService scheduler,
+ AbstractPushStreamImpl< ? > previous) {
+ super(psp, executor, scheduler);
+ this.previous = previous;
+ }
+
+ @Override
+ protected boolean begin() {
+ if(closed.compareAndSet(BUILDING, STARTED)) {
+ beginning();
+ previous.begin();
+ return true;
+ }
+ return false;
+ }
+
+ protected void beginning() {
+ // The base implementation has nothing to do, but
+ // this method is used in windowing
+ }
+
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.Promises;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.QueuePolicy;
+import org.osgi.util.pushstream.SimplePushEventSource;
+
+public class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ?
extends T>>>
+ implements SimplePushEventSource<T> {
+
+ private final Object
lock = new Object();
+
+ private final Executor
worker;
+
+ private final ScheduledExecutorService
scheduler;
+
+ private final QueuePolicy<T,U>
queuePolicy;
+
+ private final U
queue;
+
+ private final int
parallelism;
+
+ private final Semaphore
semaphore;
+
+ private final List<PushEventConsumer< ? super T>> connected
= new ArrayList<>();
+
+ private final Runnable
onClose;
+
+ private boolean
closed;
+
+ private Deferred<Void>
connectPromise;
+
+ private boolean
waitForFinishes;
+
+
+ public SimplePushEventSourceImpl(Executor worker,
+ ScheduledExecutorService scheduler, QueuePolicy<T,U>
queuePolicy,
+ U queue, int parallelism, Runnable onClose) {
+ this.worker = worker;
+ this.scheduler = scheduler;
+ this.queuePolicy = queuePolicy;
+ this.queue = queue;
+ this.parallelism = parallelism;
+ this.semaphore = new Semaphore(parallelism);
+ this.onClose = onClose;
+ this.closed = false;
+ this.connectPromise = null;
+ }
+
+ @Override
+ public AutoCloseable open(PushEventConsumer< ? super T> pec)
+ throws Exception {
+ Deferred<Void> toResolve = null;
+ synchronized (lock) {
+ if (closed) {
+ throw new IllegalStateException(
+ "This PushEventConsumer is
closed");
+ }
+
+ toResolve = connectPromise;
+ connectPromise = null;
+
+ connected.add(pec);
+ }
+
+ if (toResolve != null) {
+ toResolve.resolve(null);
+ }
+
+ return () -> {
+ closeConsumer(pec, PushEvent.close());
+ };
+ }
+
+ private void closeConsumer(PushEventConsumer< ? super T> pec,
+ PushEvent<T> event) {
+ boolean sendClose;
+ synchronized (lock) {
+ sendClose = connected.remove(pec);
+ }
+ if (sendClose) {
+ doSend(pec, event);
+ }
+ }
+
+ private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T>
event) {
+ try {
+ worker.execute(() -> safePush(pec, event));
+ } catch (RejectedExecutionException ree) {
+ // TODO log?
+ if (!event.isTerminal()) {
+ close(PushEvent.error(ree));
+ } else {
+ safePush(pec, event);
+ }
+ }
+ }
+
+ @SuppressWarnings("boxing")
+ private Promise<Long> doSendWithBackPressure(
+ PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+ Deferred<Long> d = new Deferred<>();
+ try {
+ worker.execute(
+ () -> d.resolve(System.nanoTime() +
safePush(pec, event)));
+ } catch (RejectedExecutionException ree) {
+ // TODO log?
+ if (!event.isTerminal()) {
+ close(PushEvent.error(ree));
+ return Promises.resolved(System.nanoTime());
+ } else {
+ return Promises
+ .resolved(System.nanoTime() +
safePush(pec, event));
+ }
+ }
+ return d.getPromise();
+ }
+
+ private long safePush(PushEventConsumer< ? super T> pec,
+ PushEvent<T> event) {
+ try {
+ long backpressure = pec.accept(event) * 1000000;
+ if (backpressure < 0 && !event.isTerminal()) {
+ closeConsumer(pec, PushEvent.close());
+ return -1;
+ }
+ return backpressure;
+ } catch (Exception e) {
+ // TODO log?
+ if (!event.isTerminal()) {
+ closeConsumer(pec, PushEvent.error(e));
+ }
+ return -1;
+ }
+ }
+
+ @Override
+ public void close() {
+ close(PushEvent.close());
+ }
+
+ private void close(PushEvent<T> event) {
+ List<PushEventConsumer< ? super T>> toClose;
+ Deferred<Void> toFail = null;
+ synchronized (lock) {
+ if(!closed) {
+ closed = true;
+
+ toClose = new ArrayList<>(connected);
+ connected.clear();
+ queue.clear();
+
+ if(connectPromise != null) {
+ toFail = connectPromise;
+ connectPromise = null;
+ }
+ } else {
+ toClose = emptyList();
+ }
+ }
+
+ toClose.stream().forEach(pec -> doSend(pec, event));
+
+ if (toFail != null) {
+ toFail.resolveWith(closedConnectPromise());
+ }
+
+ onClose.run();
+ }
+
+ @Override
+ public void publish(T t) {
+ enqueueEvent(PushEvent.data(t));
+ }
+
+ @Override
+ public void endOfStream() {
+ enqueueEvent(PushEvent.close());
+ }
+
+ @Override
+ public void error(Exception e) {
+ enqueueEvent(PushEvent.error(e));
+ }
+
+ private void enqueueEvent(PushEvent<T> event) {
+ synchronized (lock) {
+ if (closed || connected.isEmpty()) {
+ return;
+ }
+ }
+
+ try {
+ queuePolicy.doOffer(queue, event);
+ boolean start;
+ synchronized (lock) {
+ start = !waitForFinishes &&
semaphore.tryAcquire();
+ }
+ if (start) {
+ startWorker();
+ }
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ throw new IllegalStateException(
+ "The queue policy threw an exception",
e);
+ }
+ }
+
+ @SuppressWarnings({
+ "unchecked", "boxing"
+ })
+ private void startWorker() {
+ worker.execute(() -> {
+ try {
+
+ for(;;) {
+ PushEvent<T> event;
+ List<PushEventConsumer< ? super T>>
toCall;
+ boolean resetWait = false;
+ synchronized (lock) {
+ if(waitForFinishes) {
+ semaphore.release();
+ while(waitForFinishes) {
+
lock.notifyAll();
+ lock.wait();
+ }
+ semaphore.acquire();
+ }
+
+ event = (PushEvent<T>)
queue.poll();
+
+ if(event == null) {
+ break;
+ }
+
+ toCall = new
ArrayList<>(connected);
+ if (event.isTerminal()) {
+ waitForFinishes = true;
+ resetWait = true;
+ connected.clear();
+ while
(!semaphore.tryAcquire(parallelism - 1)) {
+ lock.wait();
+ }
+ }
+ }
+
+ List<Promise<Long>> calls =
toCall.stream().map(pec -> {
+ if (semaphore.tryAcquire()) {
+ try {
+ return
doSendWithBackPressure(pec, event);
+ } finally {
+
semaphore.release();
+ }
+ } else {
+ return
Promises.resolved(
+
System.nanoTime() + safePush(pec, event));
+ }
+ }).collect(toList());
+
+ long toWait =
Promises.<Long,Long>all(calls)
+ .map(l -> l.stream()
+
.max((a,b) -> a.compareTo(b))
+
.orElseGet(() -> System.nanoTime()))
+ .getValue() -
System.nanoTime();
+
+
+ if (toWait > 0) {
+
scheduler.schedule(this::startWorker, toWait,
+ NANOSECONDS);
+ return;
+ }
+
+ if (resetWait == true) {
+ synchronized (lock) {
+ waitForFinishes = false;
+ lock.notifyAll();
+ }
+ }
+ }
+
+ semaphore.release();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ if (queue.peek() != null && semaphore.tryAcquire()) {
+ try {
+ startWorker();
+ } catch (Exception e) {
+ close(PushEvent.error(e));
+ }
+ }
+ });
+
+ }
+
+ @Override
+ public boolean isConnected() {
+ synchronized (lock) {
+ return !connected.isEmpty();
+ }
+ }
+
+ @Override
+ public Promise<Void> connectPromise() {
+ synchronized (lock) {
+ if (closed) {
+ return closedConnectPromise();
+ }
+
+ if (connected.isEmpty()) {
+ if (connectPromise == null) {
+ connectPromise = new Deferred<>();
+ }
+ return connectPromise.getPromise();
+ } else {
+ return Promises.resolved(null);
+ }
+ }
+ }
+
+ private Promise<Void> closedConnectPromise() {
+ return Promises.failed(new IllegalStateException(
+ "This SimplePushEventSource is closed"));
+ }
+
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+
+public class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ?
extends T>>>
+ extends AbstractPushStreamImpl<T> implements PushStream<T> {
+
+ protected final Function<PushEventConsumer<T>,AutoCloseable>
connector;
+
+ protected final AtomicReference<AutoCloseable>
upstream = new AtomicReference<AutoCloseable>();
+
+ public UnbufferedPushStreamImpl(PushStreamProvider psp,
+ Executor executor, ScheduledExecutorService scheduler,
+ Function<PushEventConsumer<T>,AutoCloseable> connector)
{
+ super(psp, executor, scheduler);
+ this.connector = connector;
+ }
+
+ @Override
+ protected boolean close(PushEvent<T> event) {
+ if(super.close(event)) {
+ ofNullable(upstream.getAndSet(() -> {
+ // This block doesn't need to do anything, but
the presence
+ // of the Closable is needed to prevent
duplicate begins
+ })).ifPresent(c -> {
+ try {
+ c.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch
block
+ e.printStackTrace();
+ }
+ });
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean begin() {
+ if(closed.compareAndSet(BUILDING, STARTED)) {
+ AutoCloseable toClose =
connector.apply(this::handleEvent);
+ if(!upstream.compareAndSet(null,toClose)) {
+ //TODO log that we tried to connect twice...
+ try {
+ toClose.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ if (closed.get() == CLOSED
+ && upstream.compareAndSet(toClose,
null)) {
+ // We closed before setting the upstream -
close it now
+ try {
+ toClose.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent<
? extends T>>>
+ implements BufferBuilder<R,T,U> {
+
+ protected Executor worker;
+ protected int concurrency;
+ protected PushbackPolicy<T,U> backPressure;
+ protected QueuePolicy<T,U> bufferingPolicy;
+ protected U buffer;
+
+ @Override
+ public BufferBuilder<R,T,U> withBuffer(U queue) {
+ this.buffer = queue;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withQueuePolicy(
+ QueuePolicy<T,U> queuePolicy) {
+ this.bufferingPolicy = queuePolicy;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withQueuePolicy(
+ QueuePolicyOption queuePolicyOption) {
+ this.bufferingPolicy = queuePolicyOption.getPolicy();
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withPushbackPolicy(
+ PushbackPolicy<T,U> pushbackPolicy) {
+ this.backPressure = pushbackPolicy;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withPushbackPolicy(
+ PushbackPolicyOption pushbackPolicyOption, long time) {
+ this.backPressure = pushbackPolicyOption.getPolicy(time);
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withParallelism(int parallelism) {
+ this.concurrency = parallelism;
+ return this;
+ }
+
+ @Override
+ public BufferBuilder<R,T,U> withExecutor(Executor executor) {
+ this.worker = executor;
+ return this;
+ }
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a buffered section of a Push-based stream
+ *
+ * @param <R> The type of object being built
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<?
extends T>>> {
+
+ /**
+ * The BlockingQueue implementation to use as a buffer
+ *
+ * @param queue
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withBuffer(U queue);
+
+ /**
+ * Set the {@link QueuePolicy} of this Builder
+ *
+ * @param queuePolicy
+ * @return this builder
+ */
+ BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+ /**
+ * Set the {@link QueuePolicy} of this Builder
+ *
+ * @param queuePolicyOption
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption
queuePolicyOption);
+
+ /**
+ * Set the {@link PushbackPolicy} of this builder
+ *
+ * @param pushbackPolicy
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U>
pushbackPolicy);
+
+ /**
+ * Set the {@link PushbackPolicy} of this builder
+ *
+ * @param pushbackPolicyOption
+ * @param time
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption
pushbackPolicyOption, long time);
+
+ /**
+ * Set the maximum permitted number of concurrent event deliveries
allowed
+ * from this buffer
+ *
+ * @param parallelism
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withParallelism(int parallelism);
+
+ /**
+ * Set the {@link Executor} that should be used to deliver events from
this
+ * buffer
+ *
+ * @param executor
+ * @return this builder
+ */
+ BufferBuilder<R, T, U> withExecutor(Executor executor);
+
+ /**
+ * @return the object being built
+ */
+ R create();
+
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.PushEvent.EventType.*;
+
+/**
+ * A PushEvent is an immutable object that is transferred through a
+ * communication channel to push information to a downstream consumer. The
event
+ * has three different types:
+ * <ul>
+ * <li>{@link EventType#DATA} â Provides access to a typed data element in
the
+ * stream.
+ * <li>{@link EventType#CLOSE} â The stream is closed. After receiving this
+ * event, no more events will follow.
+ * <li>{@link EventType#ERROR} â The stream ran into an unrecoverable problem
+ * and is sending the reason downstream. The stream is closed and no more
events
+ * will follow after this event.
+ * </ul>
+ *
+ * @param <T> The payload type of the event.
+ * @Immutable
+ */
+public abstract class PushEvent<T> {
+
+ /**
+ * The type of a {@link PushEvent}.
+ */
+ public static enum EventType {
+ /**
+ * A data event forming part of the stream
+ */
+ DATA,
+ /**
+ * An error event that indicates streaming has failed and that
no more
+ * events will arrive
+ */
+ ERROR,
+ /**
+ * An event that indicates that the stream has terminated
normally
+ */
+ CLOSE
+ }
+
+ /**
+ * Package private default constructor.
+ */
+ PushEvent() {}
+
+ /**
+ * Get the type of this event.
+ *
+ * @return The type of this event.
+ */
+ public abstract EventType getType();
+
+ /**
+ * Return the data for this event.
+ *
+ * @return The data payload.
+ * @throws IllegalStateException if this event is not a
+ * {@link EventType#DATA} event.
+ */
+ public T getData() throws IllegalStateException {
+ throw new IllegalStateException(
+ "Not a DATA event, the event type is " +
getType());
+ }
+
+ /**
+ * Return the error that terminated the stream.
+ *
+ * @return The error that terminated the stream.
+ * @throws IllegalStateException if this event is not an
+ * {@link EventType#ERROR} event.
+ */
+ public Exception getFailure() throws IllegalStateException {
+ throw new IllegalStateException(
+ "Not an ERROR event, the event type is " +
getType());
+ }
+
+ /**
+ * Answer if no more events will follow after this event.
+ *
+ * @return {@code false} if this is a data event, otherwise {@code
true}.
+ */
+ public boolean isTerminal() {
+ return true;
+ }
+
+ /**
+ * Create a new data event.
+ *
+ * @param <T> The payload type.
+ * @param payload The payload.
+ * @return A new data event wrapping the specified payload.
+ */
+ public static <T> PushEvent<T> data(T payload) {
+ return new DataEvent<T>(payload);
+ }
+
+ /**
+ * Create a new error event.
+ *
+ * @param <T> The payload type.
+ * @param e The error.
+ * @return A new error event with the specified error.
+ */
+ public static <T> PushEvent<T> error(Exception e) {
+ return new ErrorEvent<T>(e);
+ }
+
+ /**
+ * Create a new close event.
+ *
+ * @param <T> The payload type.
+ * @return A new close event.
+ */
+ public static <T> PushEvent<T> close() {
+ return new CloseEvent<T>();
+ }
+
+ /**
+ * Convenience to cast a close/error event to another payload type.
Since
+ * the payload type is not needed for these events this is harmless.
This
+ * therefore allows you to forward the close/error event downstream
without
+ * creating anew event.
+ *
+ * @param <X> The new payload type.
+ * @return The current error or close event mapped to a new payload
type.
+ * @throws IllegalStateException if the event is a {@link
EventType#DATA}
+ * event.
+ */
+ public <X> PushEvent<X> nodata() throws IllegalStateException {
+ @SuppressWarnings("unchecked")
+ PushEvent<X> result = (PushEvent<X>) this;
+ return result;
+ }
+
+ static final class DataEvent<T> extends PushEvent<T> {
+ private final T data;
+
+ DataEvent(T data) {
+ this.data = data;
+ }
+
+ @Override
+ public T getData() throws IllegalStateException {
+ return data;
+ }
+
+ @Override
+ public EventType getType() {
+ return DATA;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+
+ @Override
+ public <X> PushEvent<X> nodata() throws IllegalStateException {
+ throw new IllegalStateException("This event is a DATA
event");
+ }
+ }
+
+ static final class ErrorEvent<T> extends PushEvent<T> {
+ private final Exception error;
+
+ ErrorEvent(Exception error) {
+ this.error = error;
+ }
+
+ @Override
+ public Exception getFailure() {
+ return error;
+ }
+
+ @Override
+ public EventType getType() {
+ return ERROR;
+ }
+ }
+
+ static final class CloseEvent<T> extends PushEvent<T> {
+ @Override
+ public EventType getType() {
+ return CLOSE;
+ }
+ }
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An Async Event Consumer asynchronously receives Data events until it
receives
+ * either a Close or Error event.
+ *
+ * @param <T>
+ * The type for the event payload
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventConsumer<T> {
+
+ /**
+ * If ABORT is used as return value, the sender should close the
channel all
+ * the way to the upstream source. The ABORT will not guarantee that no
+ * more events are delivered since this is impossible in a concurrent
+ * environment. The consumer should accept subsequent events and
close/clean
+ * up when the Close or Error event is received.
+ *
+ * Though ABORT has the value -1, any value less than 0 will act as an
+ * abort.
+ */
+ long ABORT = -1;
+
+ /**
+ * A 0 indicates that the consumer is willing to receive subsequent
events
+ * at full speeds.
+ *
+ * Any value more than 0 will indicate that the consumer is becoming
+ * overloaded and wants a delay of the given milliseconds before the
next
+ * event is sent. This allows the consumer to pushback the event
delivery
+ * speed.
+ */
+ long CONTINUE = 0;
+
+ /**
+ * Accept an event from a source. Events can be delivered on multiple
+ * threads simultaneously. However, Close and Error events are the last
+ * events received, no more events must be sent after them.
+ *
+ * @param event The event
+ * @return less than 0 means abort, 0 means continue, more than 0 means
+ * delay ms
+ * @throws Exception to indicate that an error has occured and that no
+ * further events should be delivered to this
+ * {@link PushEventConsumer}
+ */
+ long accept(PushEvent<? extends T> event) throws Exception;
+
+}
Added:
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
URL:
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java?rev=1766040&view=auto
==============================================================================
---
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
(added)
+++
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
Fri Oct 21 15:10:51 2016
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An event source. An event source can open a channel between a source and a
+ * consumer. Once the channel is opened (even before it returns) the source can
+ * send events to the consumer.
+ *
+ * A source should stop sending and automatically close the channel when
sending
+ * an event returns a negative value, see {@link PushEventConsumer#ABORT}.
+ * Values that are larger than 0 should be treated as a request to delay the
+ * next events with those number of milliseconds.
+ *
+ * @param <T>
+ * The payload type
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventSource<T> {
+
+ /**
+ * Open the asynchronous channel between the source and the consumer.
The
+ * call returns an {@link AutoCloseable}. This can be closed, and should
+ * close the channel, including sending a Close event if the channel
was not
+ * already closed. The returned object must be able to be closed
multiple
+ * times without sending more than one Close events.
+ *
+ * @param aec the consumer (not null)
+ * @return a {@link AutoCloseable} that can be used to close the stream
+ * @throws Exception
+ */
+ AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception;
+}