Author: timothyjward
Date: Fri Oct 21 15:10:51 2016
New Revision: 1766040

URL: http://svn.apache.org/viewvc?rev=1766040&view=rev
Log:
[pushstream] Initial contribution of Push Streams

Added:
    aries/trunk/pushstream/
    aries/trunk/pushstream/README.md
    aries/trunk/pushstream/pom.xml
    aries/trunk/pushstream/pushstream/
    aries/trunk/pushstream/pushstream/bnd.bnd
    aries/trunk/pushstream/pushstream/pom.xml
    aries/trunk/pushstream/pushstream/src/
    aries/trunk/pushstream/pushstream/src/main/
    aries/trunk/pushstream/pushstream/src/main/java/
    aries/trunk/pushstream/pushstream/src/main/java/org/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/
    aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/
    
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/
    aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
    
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
Modified:
    aries/trunk/pom.xml

Modified: aries/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/aries/trunk/pom.xml?rev=1766040&r1=1766039&r2=1766040&view=diff
==============================================================================
--- aries/trunk/pom.xml (original)
+++ aries/trunk/pom.xml Fri Oct 21 15:10:51 2016
@@ -59,6 +59,7 @@
         <module>esa-maven-plugin</module>
         <module>async</module>
         <module>tx-control</module>
+        <module>pushstream</module>
     </modules>
 
     <build>

Added: aries/trunk/pushstream/README.md
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/README.md?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/README.md (added)
+++ aries/trunk/pushstream/README.md Fri Oct 21 15:10:51 2016
@@ -0,0 +1,9 @@
+Sample OSGi Pushstream implementation
+-------------------------------------
+
+This project is a prototype implementation of the OSGi Pushstream 
specification.
+
+OSGi Push Streams (RFC-216 
https://github.com/osgi/design/tree/master/rfcs/rfc0216) are an in-progress RFC 
publicly available from the OSGi Alliance. They are also described in chapter 
706 of the OSGi R7 Early Draft 
https://osgi.org/download/osgi.cmpn-7.0.0-earlydraft1.pdf
+
+Given that the RFC is non-final the OSGi API declared in this project is 
subject to change at any time up to its official release. Also the behaviour of 
this implementation may not always be up-to-date with the latest wording in the 
RFC. The project maintainers will, however try to keep pace with the RFC, and 
to ensure that the implementations are compliant with any OSGi specifications 
that result from the RFC.
+

Added: aries/trunk/pushstream/pom.xml
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pom.xml?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pom.xml (added)
+++ aries/trunk/pushstream/pom.xml Fri Oct 21 15:10:51 2016
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+     http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache</groupId>
+               <artifactId>apache</artifactId>
+               <version>17</version>
+               <relativePath />
+       </parent>
+
+       <groupId>org.apache.aries.pushstream</groupId>
+       <artifactId>parent</artifactId>
+       <version>0.0.1-SNAPSHOT</version>
+       <packaging>pom</packaging>
+       <description>Apache Aries Push Stream Parent</description>
+       <scm>
+               <connection>
+                       
scm:svn:http://svn.apache.org/repos/asf/aries/trunk/pushstream
+               </connection>
+               <developerConnection>
+                       
scm:svn:https://svn.apache.org/repos/asf/aries/trunk/pushstream
+               </developerConnection>
+               <url>
+                       http://svn.apache.org/viewvc/aries/trunk/pushstream
+               </url>
+       </scm>
+
+       <profiles>
+               <profile>
+                       <id>jdk18</id>
+                       <activation>
+                               <jdk>1.8</jdk>
+                       </activation>
+                       <modules>
+                               <module>pushstream</module>
+                       </modules>
+               </profile>
+       </profiles>
+
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>org.osgi</groupId>
+                               <artifactId>osgi.annotation</artifactId>
+                               <version>6.0.1</version>
+                               <scope>provided</scope>
+                       </dependency>
+                       <dependency>
+                               <groupId>org.apache.aries.async</groupId>
+                               
<artifactId>org.apache.aries.async.promise.api</artifactId>
+                               <version>1.0.1</version>
+                       </dependency>
+                       <dependency>
+                               <groupId>org.slf4j</groupId>
+                               <artifactId>slf4j-api</artifactId>
+                               <version>1.7.0</version>
+                       </dependency>
+                       <dependency>
+                               <groupId>junit</groupId>
+                               <artifactId>junit</artifactId>
+                               <version>4.11</version>
+                               <scope>test</scope>
+                       </dependency>
+                       <dependency>
+                               <groupId>org.mockito</groupId>
+                               <artifactId>mockito-all</artifactId>
+                               <version>1.9.5</version>
+                               <scope>test</scope>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
+
+       <build>
+               <pluginManagement>
+                       <plugins>
+                               <plugin>
+                                       
<artifactId>maven-compiler-plugin</artifactId>
+                                       <configuration>
+                                               <source>1.8</source>
+                                               <target>1.8</target>
+                                       </configuration>
+                               </plugin>
+                               <plugin>
+                                       
<groupId>org.apache.maven.plugins</groupId>
+                                       
<artifactId>maven-jar-plugin</artifactId>
+                                       <configuration>
+                                               
<useDefaultManifestFile>true</useDefaultManifestFile>
+                                       </configuration>
+                               </plugin>
+                               <plugin>
+                                       
<groupId>org.apache.maven.plugins</groupId>
+                                       
<artifactId>maven-javadoc-plugin</artifactId>
+                                       <configuration>
+                                               <source>1.8</source>
+                                       </configuration>
+                               </plugin>
+                               <plugin>
+                                       <groupId>biz.aQute.bnd</groupId>
+                                       
<artifactId>bnd-maven-plugin</artifactId>
+                                       <version>3.2.0</version>
+                                       <executions>
+                                               <execution>
+                                                       
<id>default-bnd-process</id>
+                                                       <goals>
+                                                               
<goal>bnd-process</goal>
+                                                       </goals>
+                                               </execution>
+                                       </executions>
+                               </plugin>
+                       </plugins>
+               </pluginManagement>
+       </build>
+</project>

Added: aries/trunk/pushstream/pushstream/bnd.bnd
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/bnd.bnd?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/bnd.bnd (added)
+++ aries/trunk/pushstream/pushstream/bnd.bnd Fri Oct 21 15:10:51 2016
@@ -0,0 +1 @@
+Export-Package: ${packages;VERSIONED}

Added: aries/trunk/pushstream/pushstream/pom.xml
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/pom.xml?rev=1766040&view=auto
==============================================================================
--- aries/trunk/pushstream/pushstream/pom.xml (added)
+++ aries/trunk/pushstream/pushstream/pom.xml Fri Oct 21 15:10:51 2016
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+     http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <parent>
+               <groupId>org.apache.aries.pushstream</groupId>
+               <artifactId>parent</artifactId>
+               <version>0.0.1-SNAPSHOT</version>
+               <relativePath>../pom.xml</relativePath>
+       </parent>
+       <groupId>org.apache.aries.pushstream</groupId>
+       <artifactId>pushstream</artifactId>
+       <name>Apache Aries Push Streams</name>
+       <version>0.0.1-SNAPSHOT</version>
+
+
+       <description>
+        This bundle contains the Apache Aries OSGi Push Stream implementation.
+    </description>
+
+       <scm>
+               <connection>
+            
scm:svn:http://svn.apache.org/repos/asf/aries/trunk/pushstream/pushstream
+        </connection>
+               <developerConnection>
+            
scm:svn:https://svn.apache.org/repos/asf/aries/trunk/pushstream/pushstream
+        </developerConnection>
+               <url>
+            http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream
+        </url>
+       </scm>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.osgi</groupId>
+                       <artifactId>osgi.annotation</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.aries.async</groupId>
+                       
<artifactId>org.apache.aries.async.promise.api</artifactId>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>biz.aQute.bnd</groupId>
+                               <artifactId>bnd-maven-plugin</artifactId>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/AbstractPushStreamImpl.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,1374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+import static org.osgi.util.pushstream.PushEventConsumer.*;
+
+import java.time.Duration;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushEventSource;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamBuilder;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+public abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
+       
+       public static enum State {
+               BUILDING, STARTED, CLOSED
+       }
+       
+       protected final PushStreamProvider                                      
                        psp;
+       
+       protected final Executor                                                
                                defaultExecutor;
+       protected final ScheduledExecutorService                                
                scheduler;
+
+       protected final AtomicReference<State> closed = new 
AtomicReference<>(BUILDING);
+       
+       protected final AtomicReference<PushEventConsumer<T>>                   
next                    = new AtomicReference<>();
+       
+       protected final AtomicReference<Runnable> onCloseCallback = new 
AtomicReference<>();
+       protected final AtomicReference<Consumer<? super Throwable>> 
onErrorCallback = new AtomicReference<>();
+
+       protected abstract boolean begin();
+       
+       protected AbstractPushStreamImpl(PushStreamProvider psp,
+                       Executor executor, ScheduledExecutorService scheduler) {
+               this.psp = psp;
+               this.defaultExecutor = executor;
+               this.scheduler = scheduler;
+       }
+
+       protected long handleEvent(PushEvent< ? extends T> event) {
+               if(closed.get() != CLOSED) {
+                       try {
+                               if(event.isTerminal()) {
+                                       close(event.nodata());
+                                       return ABORT;
+                               } else {
+                                       PushEventConsumer<T> consumer = 
next.get();
+                                       long val;
+                                       if(consumer == null) {
+                                               //TODO log a warning
+                                               val = CONTINUE;
+                                       } else {
+                                               val = consumer.accept(event);
+                                       }
+                                       if(val < 0) {
+                                               close();
+                                       }
+                                       return val;
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               }
+               return ABORT;
+       }
+       
+       @Override
+       public void close() {
+               close(PushEvent.close());
+       }
+       
+       protected boolean close(PushEvent<T> event) {
+               if(!event.isTerminal()) {
+                       throw new IllegalArgumentException("The event " + event 
 + " is not a close event.");
+               }
+               if(closed.getAndSet(CLOSED) != CLOSED) {
+                       PushEventConsumer<T> aec = next.getAndSet(null);
+                       if(aec != null) {
+                               try {
+                                       aec.accept(event);
+                               } catch (Exception e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       }
+                       Runnable handler = onCloseCallback.getAndSet(null);
+                       if(handler != null) {
+                               try {
+                                       handler.run();
+                               } catch (Exception e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       }
+                       if (event.getType() == EventType.ERROR) {
+                               Consumer<? super Throwable> errorHandler = 
onErrorCallback.getAndSet(null);
+                               if(errorHandler != null) {
+                                       try {
+                                               
errorHandler.accept(event.getFailure());
+                                       } catch (Exception e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                               }
+                       }
+                       return true;
+               }
+               return false;
+       }
+       
+       @Override
+       public PushStream<T> onClose(Runnable closeHandler) {
+               if(onCloseCallback.compareAndSet(null, closeHandler)) {
+                       if(closed.get() == State.CLOSED && 
onCloseCallback.compareAndSet(closeHandler, null)) {
+                               closeHandler.run();
+                       }
+               } else {
+                       throw new IllegalStateException("A close handler has 
already been defined for this stream object");
+               }
+               return this;
+       }
+
+       @Override
+       public PushStream<T> onError(Consumer< ? super Throwable> closeHandler) 
{
+               if(onErrorCallback.compareAndSet(null, closeHandler)) {
+                       if(closed.get() == State.CLOSED) { 
+                               //TODO log already closed
+                               onErrorCallback.set(null);
+                       }
+               } else {
+                       throw new IllegalStateException("A close handler has 
already been defined for this stream object");
+               }
+               return this;
+       }
+
+       private void updateNext(PushEventConsumer<T> consumer) {
+               if(!next.compareAndSet(null, consumer)) {
+                       throw new IllegalStateException("This stream has 
already been chained");
+               } else if(closed.get() == CLOSED && 
next.compareAndSet(consumer, null)) {
+                       try {
+                               consumer.accept(PushEvent.close());
+                       } catch (Exception e) {
+                               //TODO log
+                               e.printStackTrace();
+                       }
+               }
+       }
+
+       @Override
+       public PushStream<T> filter(Predicate< ? super T> predicate) {
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               updateNext((event) -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       if (predicate.test(event.getData())) {
+                                               return 
eventStream.handleEvent(event);
+                                       } else {
+                                               return CONTINUE;
+                                       }
+                               }
+                               return eventStream.handleEvent(event);
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
+               
+               AbstractPushStreamImpl<R> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               updateNext(event -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       return eventStream.handleEvent(
+                                                       
PushEvent.data(mapper.apply(event.getData())));
+                               } else {
+                                       return 
eventStream.handleEvent(event.nodata());
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public <R> PushStream<R> flatMap(
+                       Function< ? super T, ? extends PushStream< ? extends 
R>> mapper) {
+               AbstractPushStreamImpl<R> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+
+               PushEventConsumer<R> consumer = e -> {
+                       switch (e.getType()) {
+                               case ERROR :
+                                       close(e.nodata());
+                                       return ABORT;
+                               case CLOSE :
+                                       // Close should allow the next flat 
mapped entry
+                                       // without closing the stream;
+                                       return ABORT;
+                               case DATA :
+                                       long returnValue = 
eventStream.handleEvent(e);
+                                       if (returnValue < 0) {
+                                               close();
+                                               return ABORT;
+                                       }
+                                       return returnValue;
+                               default :
+                                       throw new IllegalArgumentException(
+                                                       "The event type " + 
e.getType() + " is unknown");
+                       }
+               };
+
+               updateNext(event -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       PushStream< ? extends R> mappedStream = 
mapper
+                                                       .apply(event.getData());
+
+                                       return 
mappedStream.forEachEvent(consumer)
+                                                       .getValue()
+                                                       .longValue();
+                               } else {
+                                       return 
eventStream.handleEvent(event.nodata());
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public PushStream<T> distinct() {
+               Set<T> set = Collections.<T>newSetFromMap(new 
ConcurrentHashMap<>());
+               return filter(set::add);
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Override
+       public PushStream<T> sorted() {
+               return sorted((Comparator)Comparator.naturalOrder());
+       }
+
+       @Override
+       public PushStream<T> sorted(Comparator< ? super T> comparator) {
+               List<T> list = Collections.synchronizedList(new ArrayList<>());
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               updateNext(event -> {
+                       try {
+                               switch(event.getType()) {
+                                       case DATA : 
+                                               list.add(event.getData());
+                                               return CONTINUE;
+                                       case CLOSE :
+                                               list.sort(comparator);
+                                               for(T t : list) {
+                                                       
eventStream.handleEvent(PushEvent.data(t));
+                                               }
+                                               return ABORT;
+                                       case ERROR :
+                                               return 
eventStream.handleEvent(event.nodata());
+                               }
+                               return eventStream.handleEvent(event.nodata());
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public PushStream<T> limit(long maxSize) {
+               if(maxSize <= 0) {
+                       throw new IllegalArgumentException("The limit must be 
greater than zero");
+               }
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               AtomicLong counter = new AtomicLong(maxSize);
+               updateNext(event -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       long count = counter.decrementAndGet();
+                                       if (count > 0) {
+                                               return 
eventStream.handleEvent(event);
+                                       } else if (count == 0) {
+                                               eventStream.handleEvent(event);
+                                       }
+                                       return ABORT;
+                               } else {
+                                       return 
eventStream.handleEvent(event.nodata());
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public PushStream<T> skip(long n) {
+               if(n <= 0) {
+                       throw new IllegalArgumentException("The number to skip 
must be greater than zero");
+               }
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               AtomicLong counter = new AtomicLong(n);
+               updateNext(event -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       if (counter.get() > 0 && 
counter.decrementAndGet() >= 0) {
+                                               return CONTINUE;
+                                       } else {
+                                               return 
eventStream.handleEvent(event);
+                                       }                               
+                               } else {
+                                       return 
eventStream.handleEvent(event.nodata());
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public PushStream<T> fork(int n, int delay, Executor ex) {
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, ex, scheduler, this);
+               Semaphore s = new Semaphore(n);
+               updateNext(event -> {
+                       try {
+                               if (event.isTerminal()) {
+                                       s.acquire(n);
+                                       eventStream.close(event.nodata());
+                                       return ABORT;
+                               }
+       
+                               s.acquire(1);
+       
+                               ex.execute(() -> {
+                                       try {
+                                               if 
(eventStream.handleEvent(event) < 0) {
+                                                       
eventStream.close(PushEvent.close());
+                                               }
+                                       } catch (Exception e1) {
+                                               close(PushEvent.error(e1));
+                                       } finally {
+                                               s.release(1);
+                                       }
+                               });
+       
+                               return s.getQueueLength() * delay;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+
+               return eventStream;
+       }
+       
+       @Override
+       public PushStream<T> buffer() {
+               return psp.createStream(c -> {
+                       forEachEvent(c);
+                       return this;
+               });
+       }
+
+       @Override
+       public <U extends BlockingQueue<PushEvent< ? extends T>>> 
PushStreamBuilder<T,U> buildBuffer() {
+               return psp.buildStream(c -> {
+                       forEachEvent(c);
+                       return this;
+               });
+       }
+
+       @Override
+       public PushStream<T> merge(
+                       PushEventSource< ? extends T> source) {
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               AtomicInteger count = new AtomicInteger(2);
+               PushEventConsumer<T> consumer = event -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       return eventStream.handleEvent(event);
+                               }
+       
+                               if (count.decrementAndGet() == 0) {
+                                       eventStream.handleEvent(event.nodata());
+                                       return ABORT;
+                               }
+                               return CONTINUE;
+                       } catch (Exception e) {
+                               PushEvent<T> error = PushEvent.error(e);
+                               close(error);
+                               eventStream.close(event.nodata());
+                               return ABORT;
+                       }
+               };
+               updateNext(consumer);
+               AutoCloseable second;
+               try {
+                       second = source.open((PushEvent< ? extends T> event) -> 
{
+                               return consumer.accept(event);
+                       });
+               } catch (Exception e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+                       throw new IllegalStateException(
+                                       "Unable to merge events as the event 
source could not be opened.",
+                                       e);
+               }
+               
+               return eventStream.onClose(() -> {
+                       try {
+                               second.close();
+                       } catch (Exception e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       } 
+               }).map(Function.identity());
+       }
+
+       @Override
+       public PushStream<T> merge(PushStream< ? extends T> source) {
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               AtomicInteger count = new AtomicInteger(2);
+               PushEventConsumer<T> consumer = event -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       return eventStream.handleEvent(event);
+                               }
+                               
+                               if (count.decrementAndGet() == 0) {
+                                       eventStream.handleEvent(event.nodata());
+                                       return ABORT;
+                               }
+                               return CONTINUE;
+                       } catch (Exception e) {
+                               PushEvent<T> error = PushEvent.error(e);
+                               close(error);
+                               eventStream.close(event.nodata());
+                               return ABORT;
+                       }
+               };
+               updateNext(consumer);
+               try {
+                       source.forEachEvent(event -> {
+                               return consumer.accept(event);
+                       }).then(p -> {
+                               count.decrementAndGet();
+                               consumer.accept(PushEvent.close());
+                               return null;
+                       }, p -> {
+                               count.decrementAndGet();
+                               consumer.accept(PushEvent.error((Exception) 
p.getFailure()));
+                       });
+               } catch (Exception e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+                       throw new IllegalStateException(
+                                       "Unable to merge events as the event 
source could not be opened.",
+                                       e);
+               }
+               
+               return eventStream.onClose(() -> {
+                       try {
+                               source.close();
+                       } catch (Exception e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       } 
+               }).map(Function.identity());
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public PushStream<T>[] split(Predicate< ? super T>... predicates) {
+               Predicate<? super T>[] tests = Arrays.copyOf(predicates, 
predicates.length);
+               AbstractPushStreamImpl<T>[] rsult = new 
AbstractPushStreamImpl[tests.length];
+               for(int i = 0; i < tests.length; i++) {
+                       rsult[i] = new IntermediatePushStreamImpl<>(psp, 
defaultExecutor,
+                                       scheduler, this);
+               }
+               AtomicReferenceArray<Boolean> off = new 
AtomicReferenceArray<>(tests.length);
+               AtomicInteger count = new AtomicInteger(tests.length);
+               updateNext(event -> {
+                       if (!event.isTerminal()) {
+                               long delay = CONTINUE;
+                               for (int i = 0; i < tests.length; i++) {
+                                       try {
+                                               if (off.get(i).booleanValue()
+                                                               && 
tests[i].test(event.getData())) {
+                                                       long accept = 
rsult[i].handleEvent(event);
+                                                       if (accept < 0) {
+                                                               off.set(i, 
Boolean.TRUE);
+                                                               
count.decrementAndGet();
+                                                       } else if (accept > 
delay) {
+                                                               accept = delay;
+                                                       }
+                                               }
+                                       } catch (Exception e) {
+                                               try {
+                                                       
rsult[i].close(PushEvent.error(e));
+                                               } catch (Exception e2) {
+                                                       //TODO log
+                                               }
+                                               off.set(i, Boolean.TRUE);
+                                       }
+                               }
+                               if (count.get() == 0)
+                                       return ABORT;
+
+                               return delay;
+                       }
+                       for (AbstractPushStreamImpl<T> as : rsult) {
+                               try {
+                                       as.handleEvent(event.nodata());
+                               } catch (Exception e) {
+                                       try {
+                                               as.close(PushEvent.error(e));
+                                       } catch (Exception e2) {
+                                               //TODO log
+                                       }
+                               }
+                       }
+                       return ABORT;
+               });
+               return Arrays.copyOf(rsult, tests.length);
+       }
+
+       @Override
+       public PushStream<T> sequential() {
+               AbstractPushStreamImpl<T> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               Lock lock = new ReentrantLock();
+               updateNext((event) -> {
+                       try {
+                               lock.lock();
+                               try {
+                                       return eventStream.handleEvent(event);
+                               } finally {
+                                       lock.unlock();
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public <R> PushStream<R> coalesce(
+                       Function< ? super T,Optional<R>> accumulator) {
+               AbstractPushStreamImpl<R> eventStream = new 
IntermediatePushStreamImpl<>(
+                               psp, defaultExecutor, scheduler, this);
+               updateNext((event) -> {
+                       try {
+                               if (!event.isTerminal()) {
+                                       Optional<PushEvent<R>> coalesced = 
accumulator
+                                                       
.apply(event.getData()).map(PushEvent::data);
+                                       if (coalesced.isPresent()) {
+                                               try {
+                                                       return 
eventStream.handleEvent(coalesced.get());
+                                               } catch (Exception ex) {
+                                                       
close(PushEvent.error(ex));
+                                                       return ABORT;
+                                               }
+                                       } else {
+                                               return CONTINUE;
+                                       }
+                               }
+                               return eventStream.handleEvent(event.nodata());
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       @Override
+       public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> 
f) {
+               if (count <= 0)
+                       throw new IllegalArgumentException(
+                                       "A coalesce operation must collect a 
positive number of events");
+               // This could be optimised to only use a single collection 
queue.
+               // It would save some GC, but is it worth it?
+               return coalesce(() -> count, f);
+       }
+
+       @Override
+       public <R> PushStream<R> coalesce(IntSupplier count,
+                       Function<Collection<T>,R> f) {
+               AtomicReference<Queue<T>> queueRef = new 
AtomicReference<Queue<T>>(
+                               null);
+
+               Runnable init = () -> queueRef
+                               
.set(getQueueForInternalBuffering(count.getAsInt()));
+
+               @SuppressWarnings("resource")
+               AbstractPushStreamImpl<R> eventStream = new 
IntermediatePushStreamImpl<R>(
+                               psp, defaultExecutor, scheduler, this) {
+                       @Override
+                       protected void beginning() {
+                               init.run();
+                       }
+               };
+
+               AtomicBoolean endPending = new AtomicBoolean();
+               Object lock = new Object();
+               updateNext((event) -> {
+                       try {
+                               Queue<T> queue;
+                               if (!event.isTerminal()) {
+                                       synchronized (lock) {
+                                               for (;;) {
+                                                       queue = queueRef.get();
+                                                       if (queue == null) {
+                                                               if 
(endPending.get()) {
+                                                                       return 
ABORT;
+                                                               } else {
+                                                                       
continue;
+                                                               }
+                                                       } else if 
(queue.offer(event.getData())) {
+                                                               return CONTINUE;
+                                                       } else {
+                                                               
queueRef.lazySet(null);
+                                                               break;
+                                                       }
+                                               }
+                                       }
+
+                                       queueRef.set(
+                                                       
getQueueForInternalBuffering(count.getAsInt()));
+
+                                       // This call is on the same thread and 
so must happen
+                                       // outside
+                                       // the synchronized block.
+                                       return aggregateAndForward(f, 
eventStream, event,
+                                                       queue);
+                               } else {
+                                       synchronized (lock) {
+                                               queue = queueRef.get();
+                                               queueRef.lazySet(null);
+                                               endPending.set(true);
+                                       }
+                                       if (queue != null) {
+                                               eventStream.handleEvent(
+                                                               
PushEvent.data(f.apply(queue)));
+                                       }
+                               }
+                               return eventStream.handleEvent(event.nodata());
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       private <R> long aggregateAndForward(Function<Collection<T>,R> f,
+                       AbstractPushStreamImpl<R> eventStream,
+                       PushEvent< ? extends T> event, Queue<T> queue) {
+               if (!queue.offer(event.getData())) {
+                       ((ArrayQueue<T>) queue).forcePush(event.getData());
+               }
+               return eventStream.handleEvent(PushEvent.data(f.apply(queue)));
+       }
+       
+       
+       @Override
+       public <R> PushStream<R> window(Duration time,
+                       Function<Collection<T>,R> f) {
+               return window(time, defaultExecutor, f);
+       }
+
+       @Override
+       public <R> PushStream<R> window(Duration time, Executor executor,
+                       Function<Collection<T>,R> f) {
+               return window(() -> time, () -> 0, executor, (t, c) -> 
f.apply(c));
+       }
+
+       @Override
+       public <R> PushStream<R> window(Supplier<Duration> time,
+                       IntSupplier maxEvents,
+                       BiFunction<Long,Collection<T>,R> f) {
+               return window(time, maxEvents, defaultExecutor, f);
+       }
+
+       @Override
+       public <R> PushStream<R> window(Supplier<Duration> time,
+                       IntSupplier maxEvents, Executor ex,
+                       BiFunction<Long,Collection<T>,R> f) {
+
+               AtomicLong timestamp = new AtomicLong();
+               AtomicLong counter = new AtomicLong();
+               Object lock = new Object();
+               AtomicReference<Queue<T>> queueRef = new 
AtomicReference<Queue<T>>(
+                               null);
+
+               // This code is declared as a separate block to avoid any 
confusion
+               // about which instance's methods and variables are in scope
+               Consumer<AbstractPushStreamImpl<R>> begin = p -> {
+
+                       synchronized (lock) {
+                               timestamp.lazySet(System.nanoTime());
+                               long count = counter.get();
+
+
+                               scheduler.schedule(
+                                               getWindowTask(p, f, time, 
maxEvents, lock, count,
+                                                               queueRef, 
timestamp, counter, ex),
+                                               time.get().toNanos(), 
NANOSECONDS);
+                       }
+
+                       
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+               };
+
+               @SuppressWarnings("resource")
+               AbstractPushStreamImpl<R> eventStream = new 
IntermediatePushStreamImpl<R>(
+                               psp, ex, scheduler, this) {
+                       @Override
+                       protected void beginning() {
+                               begin.accept(this);
+                       }
+               };
+
+               AtomicBoolean endPending = new AtomicBoolean(false);
+               updateNext((event) -> {
+                       try {
+                               if (eventStream.closed.get() == CLOSED) {
+                                       return ABORT;
+                               }
+                               Queue<T> queue;
+                               if (!event.isTerminal()) {
+                                       long elapsed;
+                                       long newCount;
+                                       synchronized (lock) {
+                                               for (;;) {
+                                                       queue = queueRef.get();
+                                                       if (queue == null) {
+                                                               if 
(endPending.get()) {
+                                                                       return 
ABORT;
+                                                               } else {
+                                                                       
continue;
+                                                               }
+                                                       } else if 
(queue.offer(event.getData())) {
+                                                               return CONTINUE;
+                                                       } else {
+                                                               
queueRef.lazySet(null);
+                                                               break;
+                                                       }
+                                               }
+
+                                               long now = System.nanoTime();
+                                               elapsed = now - timestamp.get();
+                                               timestamp.lazySet(now);
+                                               newCount = counter.get() + 1;
+                                               counter.lazySet(newCount);
+
+                                               // This is a non-blocking call, 
and must happen in the
+                                               // synchronized block to avoid 
re=ordering the executor
+                                               // enqueue with a subsequent 
incoming close operation
+                                               aggregateAndForward(f, 
eventStream, event, queue,
+                                                               ex, elapsed);
+                                       }
+                                       // These must happen outside the 
synchronized block as we
+                                       // call out to user code
+                                       queueRef.set(
+                                                       
getQueueForInternalBuffering(maxEvents.getAsInt()));
+                                       scheduler.schedule(
+                                                       
getWindowTask(eventStream, f, time, maxEvents, lock,
+                                                                       
newCount, queueRef, timestamp, counter, ex),
+                                                       time.get().toNanos(), 
NANOSECONDS);
+
+                                       return CONTINUE;
+                               } else {
+                                       long elapsed;
+                                       synchronized (lock) {
+                                               queue = queueRef.get();
+                                               queueRef.lazySet(null);
+                                               endPending.set(true);
+                                               long now = System.nanoTime();
+                                               elapsed = now - timestamp.get();
+                                               counter.lazySet(counter.get() + 
1);
+                                       }
+                                       Collection<T> collected = queue == null 
? emptyList()
+                                                       : queue;
+                                       ex.execute(() -> {
+                                               try {
+                                                       eventStream
+                                                                       
.handleEvent(PushEvent.data(f.apply(
+                                                                               
        Long.valueOf(NANOSECONDS
+                                                                               
                        .toMillis(elapsed)),
+                                                                               
        collected)));
+                                               } catch (Exception e) {
+                                                       
close(PushEvent.error(e));
+                                               }
+                                       });
+                               }
+                               ex.execute(() -> 
eventStream.handleEvent(event.nodata()));
+                               return ABORT;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               return eventStream;
+       }
+
+       protected Queue<T> getQueueForInternalBuffering(int size) {
+               if (size == 0) {
+                       return new LinkedList<T>();
+               } else {
+                       return new ArrayQueue<>(size - 1);
+               }
+       }
+       
+       @SuppressWarnings("unchecked")
+       /**
+        * A special queue that keeps one element in reserve and can have that 
last
+        * element set using forcePush. After the element is set the capacity is
+        * permanently increased by one and cannot grow further.
+        * 
+        * @param <E> The element type
+        */
+       private static class ArrayQueue<E> extends AbstractQueue<E>
+                       implements Queue<E> {
+
+               final Object[]  store;
+
+               int                             normalLength;
+
+               int                             nextIndex;
+
+               int                             size;
+
+               ArrayQueue(int capacity) {
+                       store = new Object[capacity + 1];
+                       normalLength = store.length - 1;
+               }
+
+               @Override
+               public boolean offer(E e) {
+                       if (e == null)
+                               throw new NullPointerException("Null values are 
not supported");
+                       if (size < normalLength) {
+                               store[nextIndex] = e;
+                               size++;
+                               nextIndex++;
+                               nextIndex = nextIndex % normalLength;
+                               return true;
+                       }
+                       return false;
+               }
+
+               public void forcePush(E e) {
+                       store[normalLength] = e;
+                       normalLength++;
+                       size++;
+               }
+
+               @Override
+               public E poll() {
+                       if (size == 0) {
+                               return null;
+                       } else {
+                               int idx = nextIndex - size;
+                               if (idx < 0) {
+                                       idx += normalLength;
+                               }
+                               E value = (E) store[idx];
+                               store[idx] = null;
+                               size--;
+                               return value;
+                       }
+               }
+
+               @Override
+               public E peek() {
+                       if (size == 0) {
+                               return null;
+                       } else {
+                               int idx = nextIndex - size;
+                               if (idx < 0) {
+                                       idx += normalLength;
+                               }
+                               return (E) store[idx];
+                       }
+               }
+
+               @Override
+               public Iterator<E> iterator() {
+                       final int previousNext = nextIndex;
+                       return new Iterator<E>() {
+
+                               int idx;
+
+                               int     remaining       = size;
+
+                               {
+                                       idx = nextIndex - size;
+                                       if (idx < 0) {
+                                               idx += normalLength;
+                                       }
+                               }
+
+                               @Override
+                               public boolean hasNext() {
+                                       if (nextIndex != previousNext) {
+                                               throw new 
ConcurrentModificationException(
+                                                               "The queue was 
concurrently modified");
+                                       }
+                                       return remaining > 0;
+                               }
+
+                               @Override
+                               public E next() {
+                                       if (!hasNext()) {
+                                               throw new 
NoSuchElementException(
+                                                               "The iterator 
has no more values");
+                                       }
+                                       E value = (E) store[idx];
+                                       idx++;
+                                       remaining--;
+                                       if (idx == normalLength) {
+                                               idx = 0;
+                                       }
+                                       return value;
+                               }
+
+                       };
+               }
+
+               @Override
+               public int size() {
+                       return size;
+               }
+
+       }
+
+       private <R> Runnable getWindowTask(AbstractPushStreamImpl<R> 
eventStream,
+                       BiFunction<Long,Collection<T>,R> f, Supplier<Duration> 
time,
+                       IntSupplier maxEvents, Object lock, long 
expectedCounter,
+                       AtomicReference<Queue<T>> queueRef, AtomicLong 
timestamp,
+                       AtomicLong counter, Executor executor) {
+               return () -> {
+
+                       Queue<T> queue = null;
+                       long elapsed;
+                       synchronized (lock) {
+                               
+                               if (counter.get() != expectedCounter) {
+                                       return;
+                               }
+                               counter.lazySet(expectedCounter + 1);
+
+                               long now = System.nanoTime();
+                               elapsed = now - timestamp.get();
+                               timestamp.lazySet(now);
+
+                               queue = queueRef.get();
+                               queueRef.lazySet(null);
+
+                               // This is a non-blocking call, and must happen 
in the
+                               // synchronized block to avoid re=ordering the 
executor
+                               // enqueue with a subsequent incoming close 
operation
+
+                               Collection<T> collected = queue == null ? 
emptyList() : queue;
+                               executor.execute(() -> {
+                                       try {
+                                               
eventStream.handleEvent(PushEvent.data(f.apply(
+                                                               
Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+                                                               collected)));
+                                       } catch (Exception e) {
+                                               close(PushEvent.error(e));
+                                       }
+                               });
+                       }
+
+                       // These must happen outside the synchronized block as 
we
+                       // call out to user code
+                       
queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
+                       scheduler.schedule(
+                                       getWindowTask(eventStream, f, time, 
maxEvents, lock,
+                                                       expectedCounter + 1, 
queueRef, timestamp, counter,
+                                                       executor),
+                                       time.get().toNanos(), NANOSECONDS);
+               };
+       }
+
+       private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f,
+                       AbstractPushStreamImpl<R> eventStream,
+                       PushEvent< ? extends T> event, Queue<T> queue, Executor 
executor,
+                       long elapsed) {
+               executor.execute(() -> {
+                       try {
+                               if (!queue.offer(event.getData())) {
+                                       ((ArrayQueue<T>) 
queue).forcePush(event.getData());
+                               }
+                               long result = 
eventStream.handleEvent(PushEvent.data(
+                                               
f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)),
+                                                               queue)));
+                               if (result < 0) {
+                                       close();
+                               }
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                       }
+               });
+       }
+
+       @Override
+       public Promise<Void> forEach(Consumer< ? super T> action) {
+               Deferred<Void> d = new Deferred<>();
+               updateNext((event) -> {
+                               try {
+                                       switch(event.getType()) {
+                                               case DATA:
+                                                       
action.accept(event.getData());
+                                                       return CONTINUE;
+                                               case CLOSE:
+                                                       d.resolve(null);
+                                                       break;
+                                               case ERROR:
+                                                       
d.fail(event.getFailure());
+                                                       break;
+                                       }
+                                       close(event.nodata());
+                                       return ABORT;
+                               } catch (Exception e) {
+                                       d.fail(e);
+                                       return ABORT;
+                               }
+                       });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public Promise<Object[]> toArray() {
+               return collect(Collectors.toList())
+                               .map(List::toArray);
+       }
+
+       @Override
+       public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) {
+               return collect(Collectors.toList())
+                               .map(l -> l.toArray(generator.apply(l.size())));
+       }
+
+       @Override
+       public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
+               Deferred<T> d = new Deferred<>();
+               AtomicReference<T> iden = new AtomicReference<T>(identity);
+
+               updateNext(event -> {
+                       try {
+                               switch(event.getType()) {
+                                       case DATA:
+                                               
iden.accumulateAndGet(event.getData(), accumulator);
+                                               return CONTINUE;
+                                       case CLOSE:
+                                               d.resolve(iden.get());
+                                               break;
+                                       case ERROR:
+                                               d.fail(event.getFailure());
+                                               break;
+                               }
+                               close(event.nodata());
+                               return ABORT;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
+               Deferred<Optional<T>> d = new Deferred<>();
+               AtomicReference<T> iden = new AtomicReference<T>(null);
+
+               updateNext(event -> {
+                       try {
+                               switch(event.getType()) {
+                                       case DATA:
+                                               if (!iden.compareAndSet(null, 
event.getData()))
+                                                       
iden.accumulateAndGet(event.getData(), accumulator);
+                                               return CONTINUE;
+                                       case CLOSE:
+                                               
d.resolve(Optional.ofNullable(iden.get()));
+                                               break;
+                                       case ERROR:
+                                               d.fail(event.getFailure());
+                                               break;
+                               }
+                               close(event.nodata());
+                               return ABORT;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> 
accumulator, BinaryOperator<U> combiner) {
+               Deferred<U> d = new Deferred<>();
+               AtomicReference<U> iden = new AtomicReference<>(identity);
+
+               updateNext(event -> {
+                       try {
+                               switch(event.getType()) {
+                                       case DATA:
+                                               iden.updateAndGet((e) -> 
accumulator.apply(e, event.getData()));
+                                               return CONTINUE;
+                                       case CLOSE:
+                                               d.resolve(iden.get());
+                                               break;
+                                       case ERROR:
+                                               d.fail(event.getFailure());
+                                               break;
+                               }
+                               close(event.nodata());
+                               return ABORT;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
+               A result = collector.supplier().get();
+               Deferred<R> d = new Deferred<>();
+               updateNext(event -> {
+                       try {
+                               switch(event.getType()) {
+                                       case DATA:
+                                               
collector.accumulator().accept(result, event.getData());
+                                               return CONTINUE;
+                                       case CLOSE:
+                                               
d.resolve(collector.finisher().apply(result));
+                                               break;
+                                       case ERROR:
+                                               d.fail(event.getFailure());
+                                               break;
+                               }
+                               close(event.nodata());
+                               return ABORT;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public Promise<Optional<T>> min(Comparator<? super T> comparator)  {
+               return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b);
+       }
+
+       @Override
+       public Promise<Optional<T>> max(Comparator<? super T> comparator) {
+               return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b);
+       }
+
+       @Override
+       public Promise<Long> count() {
+               Deferred<Long> d = new Deferred<>();
+               LongAdder counter = new LongAdder();
+               updateNext((event) -> {
+                               try {
+                                       switch(event.getType()) {
+                                               case DATA:
+                                               counter.add(1);
+                                                       return CONTINUE;
+                                               case CLOSE:
+                                               
d.resolve(Long.valueOf(counter.sum()));
+                                                       break;
+                                               case ERROR:
+                                                       
d.fail(event.getFailure());
+                                                       break;
+                                       }
+                                       close(event.nodata());
+                                       return ABORT;
+                               } catch (Exception e) {
+                               close(PushEvent.error(e));
+                                       return ABORT;
+                               }
+                       });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public Promise<Boolean> anyMatch(Predicate<? super T> predicate) {
+               return filter(predicate).findAny()
+                       .map(Optional::isPresent);
+       }
+
+       @Override
+       public Promise<Boolean> allMatch(Predicate<? super T> predicate) {
+               return filter(x -> !predicate.test(x)).findAny()
+                               .map(o -> Boolean.valueOf(!o.isPresent()));
+       }
+
+       @Override
+       public Promise<Boolean> noneMatch(Predicate<? super T> predicate) {
+               return filter(predicate).findAny()
+                               .map(o -> Boolean.valueOf(!o.isPresent()));
+       }
+
+       @Override
+       public Promise<Optional<T>> findFirst() {
+               Deferred<Optional<T>> d = new Deferred<>();
+               updateNext((event) -> {
+                               try {
+                                       Optional<T> o = null;
+                                       switch(event.getType()) {
+                                               case DATA:
+                                                       o = 
Optional.of(event.getData());
+                                                       break;
+                                               case CLOSE:
+                                                       o = Optional.empty();
+                                                       break;
+                                               case ERROR:
+                                                       
d.fail(event.getFailure());
+                                                       return ABORT;
+                                       }
+                                       if(!d.getPromise().isDone())
+                                               d.resolve(o);
+                                       return ABORT;
+                               } catch (Exception e) {
+                               close(PushEvent.error(e));
+                                       return ABORT;
+                               }
+                       });
+               begin();
+               return d.getPromise();
+       }
+
+       @Override
+       public Promise<Optional<T>> findAny() {
+               return findFirst();
+       }
+
+       @Override
+       public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) 
{
+               Deferred<Long> d = new Deferred<>();
+               LongAdder la = new LongAdder();
+               updateNext((event) -> {
+                       try {
+                               switch(event.getType()) {
+                                       case DATA:
+                                               long value = 
action.accept(event);
+                                               la.add(value);
+                                               return value;
+                                       case CLOSE:
+                                               try {
+                                                       action.accept(event);
+                                               } finally {
+                                                       
d.resolve(Long.valueOf(la.sum()));
+                                               }
+                                               break;
+                                       case ERROR:
+                                               try {
+                                                       action.accept(event);
+                                               } finally {
+                                                       
d.fail(event.getFailure());
+                                               }
+                                               break;
+                               }
+                               return ABORT;
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                               return ABORT;
+                       }
+               });
+               begin();
+               return d.getPromise();
+       }
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/BufferedPushStreamImpl.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+import org.osgi.util.pushstream.PushbackPolicy;
+import org.osgi.util.pushstream.QueuePolicy;
+
+public class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? 
extends T>>>
+               extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
+       
+       private final U eventQueue;
+       
+       private final Semaphore semaphore;
+       
+       private final Executor worker;
+       
+       private final QueuePolicy<T, U> queuePolicy;
+
+       private final PushbackPolicy<T, U> pushbackPolicy;
+       
+       /**
+        * Indicates that a terminal event has been received, that we should 
stop
+        * collecting new events, and that we must drain the buffer before
+        * continuing
+        */
+       private final AtomicBoolean                     softClose       = new 
AtomicBoolean();
+
+       private final int                                       parallelism;
+
+       public BufferedPushStreamImpl(PushStreamProvider psp,
+                       ScheduledExecutorService scheduler, U eventQueue,
+                       int parallelism, Executor worker, QueuePolicy<T,U> 
queuePolicy,
+                       PushbackPolicy<T,U> pushbackPolicy,
+                       Function<PushEventConsumer<T>,AutoCloseable> connector) 
{
+               super(psp, worker, scheduler, connector);
+               this.eventQueue = eventQueue;
+               this.parallelism = parallelism;
+               this.semaphore = new Semaphore(parallelism);
+               this.worker = worker;
+               this.queuePolicy = queuePolicy;
+               this.pushbackPolicy = pushbackPolicy;
+       }
+
+       @Override
+       protected long handleEvent(PushEvent< ? extends T> event) {
+
+               // If we have already been soft closed, or hard closed then 
abort
+               if (!softClose.compareAndSet(false, event.isTerminal())
+                               || closed.get() == CLOSED) {
+                       return ABORT;
+               }
+
+               try {
+                       queuePolicy.doOffer(eventQueue, event);
+                       long backPressure = pushbackPolicy.pushback(eventQueue);
+                       if(backPressure < 0) {
+                               close();
+                               return ABORT;
+                       }
+                       if(semaphore.tryAcquire()) {
+                               startWorker();
+                       }
+                       return backPressure;
+               } catch (Exception e) {
+                       close(PushEvent.error(e));
+                       return ABORT;
+               }
+       }
+
+       private void startWorker() {
+               worker.execute(() -> {
+                       try {
+                               PushEvent< ? extends T> event;
+                               while ((event = eventQueue.poll()) != null) {
+                                       if (event.isTerminal()) {
+                                               // Wait for the other threads 
to finish
+                                               semaphore.acquire(parallelism - 
1);
+                                       }
+
+                                       long backpressure = 
super.handleEvent(event);
+                                       if(backpressure < 0) {
+                                               close();
+                                               return;
+                                       } else if(backpressure > 0) {
+                                               
scheduler.schedule(this::startWorker, backpressure,
+                                                               MILLISECONDS);
+                                               return;
+                                       }
+                               }
+
+                               semaphore.release();
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                       }
+                       if(eventQueue.peek() != null && semaphore.tryAcquire()) 
{
+                               try {
+                                       startWorker();
+                               } catch (Exception e) {
+                                       close(PushEvent.error(e));
+                               }
+                       }
+               });
+               
+       }
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/IntermediatePushStreamImpl.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+
+public class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
+               implements PushStream<T> {
+       
+       private final AbstractPushStreamImpl< ? > previous;
+       
+       protected IntermediatePushStreamImpl(PushStreamProvider psp,
+                       Executor executor, ScheduledExecutorService scheduler,
+                       AbstractPushStreamImpl< ? > previous) {
+               super(psp, executor, scheduler);
+               this.previous = previous;
+       }
+
+       @Override
+       protected boolean begin() {
+               if(closed.compareAndSet(BUILDING, STARTED)) {
+                       beginning();
+                       previous.begin();
+                       return true;
+               }
+               return false;
+       }
+
+       protected void beginning() {
+               // The base implementation has nothing to do, but
+               // this method is used in windowing
+       }
+       
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/SimplePushEventSourceImpl.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Collections.emptyList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+import org.osgi.util.promise.Promises;
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.QueuePolicy;
+import org.osgi.util.pushstream.SimplePushEventSource;
+
+public class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? 
extends T>>>
+               implements SimplePushEventSource<T> {
+
+       private final Object                                                    
        lock            = new Object();
+
+       private final Executor                                                  
        worker;
+
+       private final ScheduledExecutorService                          
scheduler;
+
+       private final QueuePolicy<T,U>                                          
queuePolicy;
+
+       private final U                                                         
                queue;
+
+       private final int                                                       
                parallelism;
+
+       private final Semaphore                                                 
        semaphore;
+
+       private final List<PushEventConsumer< ? super T>>       connected       
= new ArrayList<>();
+
+       private final Runnable                                                  
        onClose;
+
+       private boolean                                                         
                closed;
+       
+       private Deferred<Void>                                                  
        connectPromise;
+
+       private boolean                                                         
                waitForFinishes;
+
+
+       public SimplePushEventSourceImpl(Executor worker,
+                       ScheduledExecutorService scheduler, QueuePolicy<T,U> 
queuePolicy,
+                       U queue, int parallelism, Runnable onClose) {
+               this.worker = worker;
+               this.scheduler = scheduler;
+               this.queuePolicy = queuePolicy;
+               this.queue = queue;
+               this.parallelism = parallelism;
+               this.semaphore = new Semaphore(parallelism);
+               this.onClose = onClose;
+               this.closed = false;
+               this.connectPromise = null;
+       }
+
+       @Override
+       public AutoCloseable open(PushEventConsumer< ? super T> pec)
+                       throws Exception {
+               Deferred<Void> toResolve = null;
+               synchronized (lock) {
+                       if (closed) {
+                               throw new IllegalStateException(
+                                               "This PushEventConsumer is 
closed");
+                       }
+
+                       toResolve = connectPromise;
+                       connectPromise = null;
+
+                       connected.add(pec);
+               }
+
+               if (toResolve != null) {
+                       toResolve.resolve(null);
+               }
+
+               return () -> {
+                       closeConsumer(pec, PushEvent.close());
+               };
+       }
+
+       private void closeConsumer(PushEventConsumer< ? super T> pec,
+                       PushEvent<T> event) {
+               boolean sendClose;
+               synchronized (lock) {
+                       sendClose = connected.remove(pec);
+               }
+               if (sendClose) {
+                       doSend(pec, event);
+               }
+       }
+
+       private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> 
event) {
+               try {
+                       worker.execute(() -> safePush(pec, event));
+               } catch (RejectedExecutionException ree) {
+                       // TODO log?
+                       if (!event.isTerminal()) {
+                               close(PushEvent.error(ree));
+                       } else {
+                               safePush(pec, event);
+                       }
+               }
+       }
+
+       @SuppressWarnings("boxing")
+       private Promise<Long> doSendWithBackPressure(
+                       PushEventConsumer< ? super T> pec, PushEvent<T> event) {
+               Deferred<Long> d = new Deferred<>();
+               try {
+                       worker.execute(
+                                       () -> d.resolve(System.nanoTime() + 
safePush(pec, event)));
+               } catch (RejectedExecutionException ree) {
+                       // TODO log?
+                       if (!event.isTerminal()) {
+                               close(PushEvent.error(ree));
+                               return Promises.resolved(System.nanoTime());
+                       } else {
+                               return Promises
+                                               .resolved(System.nanoTime() + 
safePush(pec, event));
+                       }
+               }
+               return d.getPromise();
+       }
+
+       private long safePush(PushEventConsumer< ? super T> pec,
+                       PushEvent<T> event) {
+               try {
+                       long backpressure = pec.accept(event) * 1000000;
+                       if (backpressure < 0 && !event.isTerminal()) {
+                               closeConsumer(pec, PushEvent.close());
+                               return -1;
+                       }
+                       return backpressure;
+               } catch (Exception e) {
+                       // TODO log?
+                       if (!event.isTerminal()) {
+                               closeConsumer(pec, PushEvent.error(e));
+                       }
+                       return -1;
+               }
+       }
+
+       @Override
+       public void close() {
+               close(PushEvent.close());
+       }
+
+       private void close(PushEvent<T> event) {
+               List<PushEventConsumer< ? super T>> toClose;
+               Deferred<Void> toFail = null;
+               synchronized (lock) {
+                       if(!closed) {
+                               closed = true;
+                               
+                               toClose = new ArrayList<>(connected);
+                               connected.clear();
+                               queue.clear();
+
+                               if(connectPromise != null) {
+                                       toFail = connectPromise;
+                                       connectPromise = null;
+                               }
+                       } else {
+                               toClose = emptyList();
+                       }
+               }
+
+               toClose.stream().forEach(pec -> doSend(pec, event));
+
+               if (toFail != null) {
+                       toFail.resolveWith(closedConnectPromise());
+               }
+
+               onClose.run();
+       }
+
+       @Override
+       public void publish(T t) {
+               enqueueEvent(PushEvent.data(t));
+       }
+
+       @Override
+       public void endOfStream() {
+               enqueueEvent(PushEvent.close());
+       }
+
+       @Override
+       public void error(Exception e) {
+               enqueueEvent(PushEvent.error(e));
+       }
+
+       private void enqueueEvent(PushEvent<T> event) {
+               synchronized (lock) {
+                       if (closed || connected.isEmpty()) {
+                               return;
+                       }
+               }
+
+               try {
+                       queuePolicy.doOffer(queue, event);
+                       boolean start;
+                       synchronized (lock) {
+                               start = !waitForFinishes && 
semaphore.tryAcquire();
+                       }
+                       if (start) {
+                               startWorker();
+                       }
+               } catch (Exception e) {
+                       close(PushEvent.error(e));
+                       throw new IllegalStateException(
+                                       "The queue policy threw an exception", 
e);
+               }
+       }
+
+       @SuppressWarnings({
+                       "unchecked", "boxing"
+       })
+       private void startWorker() {
+               worker.execute(() -> {
+                       try {
+                               
+                               for(;;) {
+                                       PushEvent<T> event;
+                                       List<PushEventConsumer< ? super T>> 
toCall;
+                                       boolean resetWait = false;
+                                       synchronized (lock) {
+                                               if(waitForFinishes) {
+                                                       semaphore.release();
+                                                       while(waitForFinishes) {
+                                                               
lock.notifyAll();
+                                                               lock.wait();
+                                                       }
+                                                       semaphore.acquire();
+                                               }
+
+                                               event = (PushEvent<T>) 
queue.poll();
+                                               
+                                               if(event == null) {
+                                                       break;
+                                               }
+
+                                               toCall = new 
ArrayList<>(connected);
+                                               if (event.isTerminal()) {
+                                                       waitForFinishes = true;
+                                                       resetWait = true;
+                                                       connected.clear();
+                                                       while 
(!semaphore.tryAcquire(parallelism - 1)) {
+                                                               lock.wait();
+                                                       }
+                                               }
+                                       }
+                                       
+                                       List<Promise<Long>> calls = 
toCall.stream().map(pec -> {
+                                               if (semaphore.tryAcquire()) {
+                                                       try {
+                                                               return 
doSendWithBackPressure(pec, event);
+                                                       } finally {
+                                                               
semaphore.release();
+                                                       }
+                                               } else {
+                                                       return 
Promises.resolved(
+                                                                       
System.nanoTime() + safePush(pec, event));
+                                               }
+                                       }).collect(toList());
+
+                                       long toWait = 
Promises.<Long,Long>all(calls)
+                                                       .map(l -> l.stream()
+                                                                       
.max((a,b) -> a.compareTo(b))
+                                                                               
.orElseGet(() -> System.nanoTime()))
+                                                       .getValue() - 
System.nanoTime();
+                                       
+                                       
+                                       if (toWait > 0) {
+                                               
scheduler.schedule(this::startWorker, toWait,
+                                                               NANOSECONDS);
+                                               return;
+                                       }
+
+                                       if (resetWait == true) {
+                                               synchronized (lock) {
+                                                       waitForFinishes = false;
+                                                       lock.notifyAll();
+                                               }
+                                       }
+                               }
+
+                               semaphore.release();
+                       } catch (Exception e) {
+                               close(PushEvent.error(e));
+                       }
+                       if (queue.peek() != null && semaphore.tryAcquire()) {
+                               try {
+                                       startWorker();
+                               } catch (Exception e) {
+                                       close(PushEvent.error(e));
+                               }
+                       }
+               });
+
+       }
+
+       @Override
+       public boolean isConnected() {
+               synchronized (lock) {
+                       return !connected.isEmpty();
+               }
+       }
+
+       @Override
+       public Promise<Void> connectPromise() {
+               synchronized (lock) {
+                       if (closed) {
+                               return closedConnectPromise();
+                       }
+
+                       if (connected.isEmpty()) {
+                               if (connectPromise == null) {
+                                       connectPromise = new Deferred<>();
+                               }
+                               return connectPromise.getPromise();
+                       } else {
+                               return Promises.resolved(null);
+                       }
+               }
+       }
+
+       private Promise<Void> closedConnectPromise() {
+               return Promises.failed(new IllegalStateException(
+                               "This SimplePushEventSource is closed"));
+       }
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/apache/aries/pushstream/UnbufferedPushStreamImpl.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.pushstream;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.*;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import org.osgi.util.pushstream.PushEvent;
+import org.osgi.util.pushstream.PushEventConsumer;
+import org.osgi.util.pushstream.PushStream;
+import org.osgi.util.pushstream.PushStreamProvider;
+
+public class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? 
extends T>>>
+       extends AbstractPushStreamImpl<T> implements PushStream<T> {
+       
+       protected final Function<PushEventConsumer<T>,AutoCloseable>    
connector;
+       
+       protected final AtomicReference<AutoCloseable>                          
        upstream        = new AtomicReference<AutoCloseable>();
+       
+       public UnbufferedPushStreamImpl(PushStreamProvider psp,
+                       Executor executor, ScheduledExecutorService scheduler,
+                       Function<PushEventConsumer<T>,AutoCloseable> connector) 
{
+               super(psp, executor, scheduler);
+               this.connector = connector;
+       }
+
+       @Override
+       protected boolean close(PushEvent<T> event) {
+               if(super.close(event)) {
+                       ofNullable(upstream.getAndSet(() -> {
+                               // This block doesn't need to do anything, but 
the presence
+                               // of the Closable is needed to prevent 
duplicate begins
+                       })).ifPresent(c -> {
+                                       try {
+                                               c.close();
+                                       } catch (Exception e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                               });
+                       return true;
+               }
+               return false;
+       }
+
+       @Override
+       protected boolean begin() {
+               if(closed.compareAndSet(BUILDING, STARTED)) {
+                       AutoCloseable toClose = 
connector.apply(this::handleEvent);
+                       if(!upstream.compareAndSet(null,toClose)) {
+                               //TODO log that we tried to connect twice...
+                               try {
+                                       toClose.close();
+                               } catch (Exception e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       }
+
+                       if (closed.get() == CLOSED
+                                       && upstream.compareAndSet(toClose, 
null)) {
+                               // We closed before setting the upstream - 
close it now
+                               try {
+                                       toClose.close();
+                               } catch (Exception e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       }
+                       return true;
+               }
+               return false;
+       }
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/AbstractBufferBuilder.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< 
? extends T>>>
+               implements BufferBuilder<R,T,U> {
+
+       protected Executor                              worker;
+       protected int                                   concurrency;
+       protected PushbackPolicy<T,U>   backPressure;
+       protected QueuePolicy<T,U>              bufferingPolicy;
+       protected U                                             buffer;
+
+       @Override
+       public BufferBuilder<R,T,U> withBuffer(U queue) {
+               this.buffer = queue;
+               return this;
+       }
+
+       @Override
+       public BufferBuilder<R,T,U> withQueuePolicy(
+                       QueuePolicy<T,U> queuePolicy) {
+               this.bufferingPolicy = queuePolicy;
+               return this;
+       }
+
+       @Override
+       public BufferBuilder<R,T,U> withQueuePolicy(
+                       QueuePolicyOption queuePolicyOption) {
+               this.bufferingPolicy = queuePolicyOption.getPolicy();
+               return this;
+       }
+
+       @Override
+       public BufferBuilder<R,T,U> withPushbackPolicy(
+                       PushbackPolicy<T,U> pushbackPolicy) {
+               this.backPressure = pushbackPolicy;
+               return this;
+       }
+
+       @Override
+       public BufferBuilder<R,T,U> withPushbackPolicy(
+                       PushbackPolicyOption pushbackPolicyOption, long time) {
+               this.backPressure = pushbackPolicyOption.getPolicy(time);
+               return this;
+       }
+
+       @Override
+       public BufferBuilder<R,T,U> withParallelism(int parallelism) {
+               this.concurrency = parallelism;
+               return this;
+       }
+
+       @Override
+       public BufferBuilder<R,T,U> withExecutor(Executor executor) {
+               this.worker = executor;
+               return this;
+       }
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/BufferBuilder.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * Create a buffered section of a Push-based stream
+ *
+ * @param <R> The type of object being built
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? 
extends T>>> {
+
+       /**
+        * The BlockingQueue implementation to use as a buffer
+        * 
+        * @param queue
+        * @return this builder
+        */
+       BufferBuilder<R, T, U> withBuffer(U queue);
+
+       /**
+        * Set the {@link QueuePolicy} of this Builder
+        * 
+        * @param queuePolicy
+        * @return this builder
+        */
+       BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+       /**
+        * Set the {@link QueuePolicy} of this Builder
+        * 
+        * @param queuePolicyOption
+        * @return this builder
+        */
+       BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption 
queuePolicyOption);
+
+       /**
+        * Set the {@link PushbackPolicy} of this builder
+        * 
+        * @param pushbackPolicy
+        * @return this builder
+        */
+       BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> 
pushbackPolicy);
+
+       /**
+        * Set the {@link PushbackPolicy} of this builder
+        * 
+        * @param pushbackPolicyOption
+        * @param time
+        * @return this builder
+        */
+       BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption 
pushbackPolicyOption, long time);
+
+       /**
+        * Set the maximum permitted number of concurrent event deliveries 
allowed
+        * from this buffer
+        * 
+        * @param parallelism
+        * @return this builder
+        */
+       BufferBuilder<R, T, U> withParallelism(int parallelism);
+
+       /**
+        * Set the {@link Executor} that should be used to deliver events from 
this
+        * buffer
+        * 
+        * @param executor
+        * @return this builder
+        */
+       BufferBuilder<R, T, U> withExecutor(Executor executor);
+       
+       /**
+        * @return the object being built
+        */
+       R create();
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEvent.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static org.osgi.util.pushstream.PushEvent.EventType.*;
+
+/**
+ * A PushEvent is an immutable object that is transferred through a
+ * communication channel to push information to a downstream consumer. The 
event
+ * has three different types:
+ * <ul>
+ * <li>{@link EventType#DATA} – Provides access to a typed data element in 
the
+ * stream.
+ * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this
+ * event, no more events will follow.
+ * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem
+ * and is sending the reason downstream. The stream is closed and no more 
events
+ * will follow after this event.
+ * </ul>
+ *
+ * @param <T> The payload type of the event.
+ * @Immutable
+ */
+public abstract class PushEvent<T> {
+
+       /**
+        * The type of a {@link PushEvent}.
+        */
+       public static enum EventType {
+               /**
+                * A data event forming part of the stream
+                */
+               DATA,
+               /**
+                * An error event that indicates streaming has failed and that 
no more
+                * events will arrive
+                */
+               ERROR,
+               /**
+                * An event that indicates that the stream has terminated 
normally
+                */
+               CLOSE
+       }
+
+       /**
+        * Package private default constructor.
+        */
+       PushEvent() {}
+
+       /**
+        * Get the type of this event.
+        * 
+        * @return The type of this event.
+        */
+       public abstract EventType getType();
+
+       /**
+        * Return the data for this event.
+        * 
+        * @return The data payload.
+        * @throws IllegalStateException if this event is not a
+        *             {@link EventType#DATA} event.
+        */
+       public T getData() throws IllegalStateException {
+               throw new IllegalStateException(
+                               "Not a DATA event, the event type is " + 
getType());
+       }
+
+       /**
+        * Return the error that terminated the stream.
+        * 
+        * @return The error that terminated the stream.
+        * @throws IllegalStateException if this event is not an
+        *             {@link EventType#ERROR} event.
+        */
+       public Exception getFailure() throws IllegalStateException {
+               throw new IllegalStateException(
+                               "Not an ERROR event, the event type is " + 
getType());
+       }
+
+       /**
+        * Answer if no more events will follow after this event.
+        * 
+        * @return {@code false} if this is a data event, otherwise {@code 
true}.
+        */
+       public boolean isTerminal() {
+               return true;
+       }
+
+       /**
+        * Create a new data event.
+        * 
+        * @param <T> The payload type.
+        * @param payload The payload.
+        * @return A new data event wrapping the specified payload.
+        */
+       public static <T> PushEvent<T> data(T payload) {
+               return new DataEvent<T>(payload);
+       }
+
+       /**
+        * Create a new error event.
+        * 
+        * @param <T> The payload type.
+        * @param e The error.
+        * @return A new error event with the specified error.
+        */
+       public static <T> PushEvent<T> error(Exception e) {
+               return new ErrorEvent<T>(e);
+       }
+
+       /**
+        * Create a new close event.
+        * 
+        * @param <T> The payload type.
+        * @return A new close event.
+        */
+       public static <T> PushEvent<T> close() {
+               return new CloseEvent<T>();
+       }
+
+       /**
+        * Convenience to cast a close/error event to another payload type. 
Since
+        * the payload type is not needed for these events this is harmless. 
This
+        * therefore allows you to forward the close/error event downstream 
without
+        * creating anew event.
+        * 
+        * @param <X> The new payload type.
+        * @return The current error or close event mapped to a new payload 
type.
+        * @throws IllegalStateException if the event is a {@link 
EventType#DATA}
+        *             event.
+        */
+       public <X> PushEvent<X> nodata() throws IllegalStateException {
+               @SuppressWarnings("unchecked")
+               PushEvent<X> result = (PushEvent<X>) this;
+               return result;
+       }
+
+       static final class DataEvent<T> extends PushEvent<T> {
+               private final T data;
+
+               DataEvent(T data) {
+                       this.data = data;
+               }
+
+               @Override
+               public T getData() throws IllegalStateException {
+                       return data;
+               }
+
+               @Override
+               public EventType getType() {
+                       return DATA;
+               }
+
+               @Override
+               public boolean isTerminal() {
+                       return false;
+               }
+
+               @Override
+               public <X> PushEvent<X> nodata() throws IllegalStateException {
+                       throw new IllegalStateException("This event is a DATA 
event");
+               }
+       }
+
+       static final class ErrorEvent<T> extends PushEvent<T> {
+               private final Exception error;
+
+               ErrorEvent(Exception error) {
+                       this.error = error;
+               }
+
+               @Override
+               public Exception getFailure() {
+                       return error;
+               }
+
+               @Override
+               public EventType getType() {
+                       return ERROR;
+               }
+       }
+
+       static final class CloseEvent<T> extends PushEvent<T> {
+               @Override
+               public EventType getType() {
+                       return CLOSE;
+               }
+       }
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventConsumer.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An Async Event Consumer asynchronously receives Data events until it 
receives
+ * either a Close or Error event.
+ * 
+ * @param <T>
+ *            The type for the event payload
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventConsumer<T> {
+
+       /**
+        * If ABORT is used as return value, the sender should close the 
channel all
+        * the way to the upstream source. The ABORT will not guarantee that no
+        * more events are delivered since this is impossible in a concurrent
+        * environment. The consumer should accept subsequent events and 
close/clean
+        * up when the Close or Error event is received.
+        * 
+        * Though ABORT has the value -1, any value less than 0 will act as an
+        * abort.
+        */
+       long    ABORT           = -1;
+
+       /**
+        * A 0 indicates that the consumer is willing to receive subsequent 
events
+        * at full speeds.
+        * 
+        * Any value more than 0 will indicate that the consumer is becoming
+        * overloaded and wants a delay of the given milliseconds before the 
next
+        * event is sent. This allows the consumer to pushback the event 
delivery
+        * speed.
+        */
+       long    CONTINUE        = 0;
+
+       /**
+        * Accept an event from a source. Events can be delivered on multiple
+        * threads simultaneously. However, Close and Error events are the last
+        * events received, no more events must be sent after them.
+        * 
+        * @param event The event
+        * @return less than 0 means abort, 0 means continue, more than 0 means
+        *         delay ms
+        * @throws Exception to indicate that an error has occured and that no
+        *         further events should be delivered to this
+        *         {@link PushEventConsumer}
+        */
+       long accept(PushEvent<? extends T> event) throws Exception;
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushEventSource.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * An event source. An event source can open a channel between a source and a
+ * consumer. Once the channel is opened (even before it returns) the source can
+ * send events to the consumer.
+ *
+ * A source should stop sending and automatically close the channel when 
sending
+ * an event returns a negative value, see {@link PushEventConsumer#ABORT}.
+ * Values that are larger than 0 should be treated as a request to delay the
+ * next events with those number of milliseconds.
+ * 
+ * @param <T>
+ *            The payload type
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushEventSource<T> {
+
+       /**
+        * Open the asynchronous channel between the source and the consumer. 
The
+        * call returns an {@link AutoCloseable}. This can be closed, and should
+        * close the channel, including sending a Close event if the channel 
was not
+        * already closed. The returned object must be able to be closed 
multiple
+        * times without sending more than one Close events.
+        * 
+        * @param aec the consumer (not null)
+        * @return a {@link AutoCloseable} that can be used to close the stream
+        * @throws Exception
+        */
+       AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception;
+}



Reply via email to