Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+
+/**
+ * A Push Stream fulfills the same role as the Java 8 stream but it reverses 
the
+ * control direction. The Java 8 stream is pull based and this is push based. A
+ * Push Stream makes it possible to build a pipeline of transformations using a
+ * builder kind of model. Just like streams, it provides a number of 
terminating
+ * methods that will actually open the channel and perform the processing until
+ * the channel is closed (The source sends a Close event). The results of the
+ * processing will be send to a Promise, just like any error events. A stream
+ * can be used multiple times. The Push Stream represents a pipeline. Upstream
+ * is in the direction of the source, downstream is in the direction of the
+ * terminating method. Events are sent downstream asynchronously with no
+ * guarantee for ordering or concurrency. Methods are available to provide
+ * serialization of the events and splitting in background threads.
+ * 
+ * @param <T> The Payload type
+ */
+@ProviderType
+public interface PushStream<T> extends AutoCloseable {
+
+       /**
+        * Must be run after the channel is closed. This handler will run after 
the
+        * downstream methods have processed the close event and before the 
upstream
+        * methods have closed.
+        * 
+        * @param closeHandler Will be called on close
+        * @return This stream
+        */
+       PushStream<T> onClose(Runnable closeHandler);
+
+       /**
+        * Must be run after the channel is closed. This handler will run after 
the
+        * downstream methods have processed the close event and before the 
upstream
+        * methods have closed.
+        * 
+        * @param closeHandler Will be called on close
+        * @return This stream
+        */
+       PushStream<T> onError(Consumer< ? super Throwable> closeHandler);
+
+       /**
+        * Only pass events downstream when the predicate tests true.
+        * 
+        * @param predicate The predicate that is tested (not null)
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> filter(Predicate< ? super T> predicate);
+
+       /**
+        * Map a payload value.
+        * 
+        * @param mapper The map function
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
+
+       /**
+        * Flat map the payload value (turn one event into 0..n events of
+        * potentially another type).
+        * 
+        * @param mapper The flat map function
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> flatMap(
+                       Function< ? super T, ? extends PushStream< ? extends 
R>> mapper);
+
+       /**
+        * Remove any duplicates. Notice that this can be expensive in a large
+        * stream since it must track previous payloads.
+        * 
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> distinct();
+
+       /**
+        * Sorted the elements, assuming that T extends Comparable. This is of
+        * course expensive for large or infinite streams since it requires
+        * buffering the stream until close.
+        * 
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> sorted();
+
+       /**
+        * Sorted the elements with the given comparator. This is of course
+        * expensive for large or infinite streams since it requires buffering 
the
+        * stream until close.
+        * 
+        * @param comparator
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> sorted(Comparator< ? super T> comparator);
+
+       /**
+        * Automatically close the channel after the maxSize number of elements 
is
+        * received.
+        * 
+        * @param maxSize Maximum number of elements has been received
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> limit(long maxSize);
+
+       /**
+        * Skip a number of events in the channel.
+        * 
+        * @param n number of elements to skip
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> skip(long n);
+
+       /**
+        * Execute the downstream events in up to n background threads. If more
+        * requests are outstanding apply delay * nr of delayed threads back
+        * pressure. A downstream channel that is closed or throws an exception 
will
+        * cause all execution to cease and the stream to close
+        * 
+        * @param n number of simultaneous background threads to use
+        * @param delay Nr of ms/thread that is queued back pressure
+        * @param e an executor to use for the background threads.
+        * @return Builder style (can be a new or the same object)
+        * @throws IllegalArgumentException if the number of threads is < 1 or 
the
+        *             delay is < 0
+        * @throws NullPointerException if the Executor is null
+        */
+       PushStream<T> fork(int n, int delay, Executor e)
+                       throws IllegalArgumentException, NullPointerException;
+
+       /**
+        * Buffer the events in a queue using default values for the queue size 
and
+        * other behaviours. Buffered work will be processed asynchronously in 
the
+        * rest of the chain. Buffering also blocks the transmission of back
+        * pressure to previous elements in the chain, although back pressure is
+        * honoured by the buffer.
+        * <p>
+        * Buffers are useful for "bursty" event sources which produce a number 
of
+        * events close together, then none for some time. These bursts can
+        * sometimes overwhelm downstream event consumers. Buffering will not,
+        * however, protect downstream components from a source which produces
+        * events faster than they can be consumed. For fast sources
+        * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+        * {@link #fork(int, int, Executor)} are better choices.
+        * 
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> buffer();
+
+       /**
+        * Build a buffer to enqueue events in a queue using custom values for 
the
+        * queue size and other behaviours. Buffered work will be processed
+        * asynchronously in the rest of the chain. Buffering also blocks the
+        * transmission of back pressure to previous elements in the chain, 
although
+        * back pressure is honoured by the buffer.
+        * <p>
+        * Buffers are useful for "bursty" event sources which produce a number 
of
+        * events close together, then none for some time. These bursts can
+        * sometimes overwhelm downstream event consumers. Buffering will not,
+        * however, protect downstream components from a source which produces
+        * events faster than they can be consumed. For fast sources
+        * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
+        * {@link #fork(int, int, Executor)} are better choices.
+        * <p>
+        * Buffers are also useful as "circuit breakers" in the pipeline. If a
+        * {@link QueuePolicyOption#FAIL} is used then a full buffer will 
trigger
+        * the stream to close, preventing an event storm from reaching the 
client.
+        * 
+        * @param parallelism
+        * @param executor
+        * @param queue
+        * @param queuePolicy
+        * @param pushbackPolicy
+        * @return Builder style (can be a new or the same object)
+        */
+       <U extends BlockingQueue<PushEvent< ? extends T>>> 
PushStreamBuilder<T,U> buildBuffer();
+
+       /**
+        * Merge in the events from another source. The resulting channel is not
+        * closed until this channel and the channel from the source are closed.
+        * 
+        * @param source The source to merge in.
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> merge(PushEventSource< ? extends T> source);
+
+       /**
+        * Merge in the events from another PushStream. The resulting channel 
is not
+        * closed until this channel and the channel from the source are closed.
+        * 
+        * @param source The source to merge in.
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> merge(PushStream< ? extends T> source);
+
+       /**
+        * Split the events to different streams based on a predicate. If the
+        * predicate is true, the event is dispatched to that channel on the 
same
+        * position. All predicates are tested for every event.
+        * <p>
+        * This method differs from other methods of AsyncStream in three
+        * significant ways:
+        * <ul>
+        * <li>The return value contains multiple streams.</li>
+        * <li>This stream will only close when all of these child streams have
+        * closed.</li>
+        * <li>Event delivery is made to all open children that accept the 
event.
+        * </li>
+        * </ul>
+        * 
+        * @param predicates the predicates to test
+        * @return streams that map to the predicates
+        */
+       @SuppressWarnings("unchecked")
+       PushStream<T>[] split(Predicate< ? super T>... predicates);
+
+       /**
+        * Ensure that any events are delivered sequentially. That is, no
+        * overlapping calls downstream. This can be used to turn a forked 
stream
+        * (where for example a heavy conversion is done in multiple threads) 
back
+        * into a sequential stream so a reduce is simple to do.
+        * 
+        * @return Builder style (can be a new or the same object)
+        */
+       PushStream<T> sequential();
+
+       /**
+        * Coalesces a number of events into a new type of event. The input 
events
+        * are forwarded to a accumulator function. This function returns an
+        * Optional. If the optional is present, it's value is send downstream,
+        * otherwise it is ignored.
+        * 
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f);
+
+       /**
+        * Coalesces a number of events into a new type of event. A fixed 
number of
+        * input events are forwarded to a accumulator function. This function
+        * returns new event data to be forwarded on.
+        * 
+        * @param count
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> 
f);
+
+       /**
+        * Coalesces a number of events into a new type of event. A variable 
number
+        * of input events are forwarded to a accumulator function. The number 
of
+        * events to be forwarded is determined by calling the count function. 
The
+        * accumulator function then returns new event data to be forwarded on.
+        * 
+        * @param count
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       public <R> PushStream<R> coalesce(IntSupplier count,
+                       Function<Collection<T>,R> f);
+
+       /**
+        * Buffers a number of events over a fixed time interval and then 
forwards
+        * the events to an accumulator function. This function returns new 
event
+        * data to be forwarded on. Note that:
+        * <ul>
+        * <li>The collection forwarded to the accumulator function will be 
empty if
+        * no events arrived during the time interval.</li>
+        * <li>The accumulator function will be run and the forwarded event
+        * delivered as a different task, (and therefore potentially on a 
different
+        * thread) from the one that delivered the event to this {@link 
PushStream}.
+        * </li>
+        * <li>Due to the buffering and asynchronous delivery required, this 
method
+        * prevents the propagation of back-pressure to earlier stages</li>
+        * </ul>
+        * 
+        * @param d
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f);
+
+       /**
+        * Buffers a number of events over a fixed time interval and then 
forwards
+        * the events to an accumulator function. This function returns new 
event
+        * data to be forwarded on. Note that:
+        * <ul>
+        * <li>The collection forwarded to the accumulator function will be 
empty if
+        * no events arrived during the time interval.</li>
+        * <li>The accumulator function will be run and the forwarded event
+        * delivered by a task given to the supplied executor.</li>
+        * <li>Due to the buffering and asynchronous delivery required, this 
method
+        * prevents the propagation of back-pressure to earlier stages</li>
+        * </ul>
+        * 
+        * @param d
+        * @param executor
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> window(Duration d, Executor executor,
+                       Function<Collection<T>,R> f);
+
+       /**
+        * Buffers a number of events over a variable time interval and then
+        * forwards the events to an accumulator function. The length of time 
over
+        * which events are buffered is determined by the time function. A 
maximum
+        * number of events can also be requested, if this number of events is
+        * reached then the accumulator will be called early. The accumulator
+        * function returns new event data to be forwarded on. It is also given 
the
+        * length of time for which the buffer accumulated data. This may be 
less
+        * than the requested interval if the buffer reached the maximum number 
of
+        * requested events early. Note that:
+        * <ul>
+        * <li>The collection forwarded to the accumulator function will be 
empty if
+        * no events arrived during the time interval.</li>
+        * <li>The accumulator function will be run and the forwarded event
+        * delivered as a different task, (and therefore potentially on a 
different
+        * thread) from the one that delivered the event to this {@link 
PushStream}.
+        * </li>
+        * <li>Due to the buffering and asynchronous delivery required, this 
method
+        * prevents the propagation of back-pressure to earlier stages</li>
+        * <li>If the window finishes by hitting the maximum number of events 
then
+        * the remaining time in the window will be applied as back-pressure to 
the
+        * previous stage, attempting to slow the producer to the expected 
windowing
+        * threshold.</li>
+        * </ul>
+        * 
+        * @param timeSupplier
+        * @param maxEvents
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> window(Supplier<Duration> timeSupplier,
+                       IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> 
f);
+
+       /**
+        * Buffers a number of events over a variable time interval and then
+        * forwards the events to an accumulator function. The length of time 
over
+        * which events are buffered is determined by the time function. A 
maximum
+        * number of events can also be requested, if this number of events is
+        * reached then the accumulator will be called early. The accumulator
+        * function returns new event data to be forwarded on. It is also given 
the
+        * length of time for which the buffer accumulated data. This may be 
less
+        * than the requested interval if the buffer reached the maximum number 
of
+        * requested events early. Note that:
+        * <ul>
+        * <li>The collection forwarded to the accumulator function will be 
empty if
+        * no events arrived during the time interval.</li>
+        * <li>The accumulator function will be run and the forwarded event
+        * delivered as a different task, (and therefore potentially on a 
different
+        * thread) from the one that delivered the event to this {@link 
PushStream}.
+        * </li>
+        * <li>If the window finishes by hitting the maximum number of events 
then
+        * the remaining time in the window will be applied as back-pressure to 
the
+        * previous stage, attempting to slow the producer to the expected 
windowing
+        * threshold.</li>
+        * </ul>
+        * 
+        * @param timeSupplier
+        * @param maxEvents
+        * @param executor
+        * @param f
+        * @return Builder style (can be a new or the same object)
+        */
+       <R> PushStream<R> window(Supplier<Duration> timeSupplier,
+                       IntSupplier maxEvents, Executor executor,
+                       BiFunction<Long,Collection<T>,R> f);
+
+       /**
+        * Execute the action for each event received until the channel is 
closed.
+        * This is a terminating method, the returned promise is resolved when 
the
+        * channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param action The action to perform
+        * @return A promise that is resolved when the channel closes.
+        */
+       Promise<Void> forEach(Consumer< ? super T> action);
+
+       /**
+        * Collect the payloads in an Object array after the channel is closed. 
This
+        * is a terminating method, the returned promise is resolved when the
+        * channel is closed.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @return A promise that is resolved with all the payloads received 
over
+        *         the channel
+        */
+       Promise<Object[]> toArray();
+
+       /**
+        * Collect the payloads in an Object array after the channel is closed. 
This
+        * is a terminating method, the returned promise is resolved when the
+        * channel is closed. The type of the array is handled by the caller 
using a
+        * generator function that gets the length of the desired array.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param generator
+        * @return A promise that is resolved with all the payloads received 
over
+        *         the channel
+        */
+       <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator);
+
+       /**
+        * Standard reduce, see Stream. The returned promise will be resolved 
when
+        * the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param identity The identity/begin value
+        * @param accumulator The accumulator
+        * @return A
+        */
+       Promise<T> reduce(T identity, BinaryOperator<T> accumulator);
+
+       /**
+        * Standard reduce without identity, so the return is an Optional. The
+        * returned promise will be resolved when the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param accumulator The accumulator
+        * @return an Optional
+        */
+       Promise<Optional<T>> reduce(BinaryOperator<T> accumulator);
+
+       /**
+        * Standard reduce with identity, accumulator and combiner. The returned
+        * promise will be resolved when the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param identity
+        * @param accumulator
+        * @param combiner combines to U's into one U (e.g. how combine two 
lists)
+        * @return The promise
+        */
+       <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> 
accumulator,
+                       BinaryOperator<U> combiner);
+
+       /**
+        * See Stream. Will resolve onces the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param collector
+        * @return A Promise representing the collected results
+        */
+       <R, A> Promise<R> collect(Collector< ? super T,A,R> collector);
+
+       /**
+        * See Stream. Will resolve onces the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param comparator
+        * @return A Promise representing the minimum value, or null if no 
values
+        *         are seen before the end of the stream
+        */
+       Promise<Optional<T>> min(Comparator< ? super T> comparator);
+
+       /**
+        * See Stream. Will resolve onces the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param comparator
+        * @return A Promise representing the maximum value, or null if no 
values
+        *         are seen before the end of the stream
+        */
+       Promise<Optional<T>> max(Comparator< ? super T> comparator);
+
+       /**
+        * See Stream. Will resolve onces the channel closes.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @return A Promise representing the number of values in the stream
+        */
+       Promise<Long> count();
+
+       /**
+        * Close the channel and resolve the promise with true when the 
predicate
+        * matches a payload. If the channel is closed before the predicate 
matches,
+        * the promise is resolved with false.
+        * <p>
+        * This is a <strong>short circuiting terminal operation</strong>
+        * 
+        * @param predicate
+        * @return A Promise that will resolve when an event matches the 
predicate,
+        *         or the end of the stream is reached
+        */
+       Promise<Boolean> anyMatch(Predicate< ? super T> predicate);
+
+       /**
+        * Closes the channel and resolve the promise with false when the 
predicate
+        * does not matches a pay load.If the channel is closed before, the 
promise
+        * is resolved with true.
+        * <p>
+        * This is a <strong>short circuiting terminal operation</strong>
+        * 
+        * @param predicate
+        * @return A Promise that will resolve when an event fails to match the
+        *         predicate, or the end of the stream is reached
+        */
+       Promise<Boolean> allMatch(Predicate< ? super T> predicate);
+
+       /**
+        * Closes the channel and resolve the promise with false when the 
predicate
+        * matches any pay load. If the channel is closed before, the promise is
+        * resolved with true.
+        * <p>
+        * This is a <strong>short circuiting terminal operation</strong>
+        * 
+        * @param predicate
+        * @return A Promise that will resolve when an event matches the 
predicate,
+        *         or the end of the stream is reached
+        */
+       Promise<Boolean> noneMatch(Predicate< ? super T> predicate);
+
+       /**
+        * Close the channel and resolve the promise with the first element. If 
the
+        * channel is closed before, the Optional will have no value.
+        * 
+        * @return a promise
+        */
+       Promise<Optional<T>> findFirst();
+
+       /**
+        * Close the channel and resolve the promise with the first element. If 
the
+        * channel is closed before, the Optional will have no value.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @return a promise
+        */
+       Promise<Optional<T>> findAny();
+
+       /**
+        * Pass on each event to another consumer until the stream is closed.
+        * <p>
+        * This is a <strong>terminal operation</strong>
+        * 
+        * @param action
+        * @return a promise
+        */
+       Promise<Long> forEachEvent(PushEventConsumer< ? super T> action);
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+/**
+ * A Builder for a PushStream. This Builder extends the support of a standard
+ * BufferBuilder by allowing the PushStream to be unbuffered.
+ * 
+ *
+ * @param <T> The type of objects in the {@link PushEvent}
+ * @param <U> The type of the Queue used in the user specified buffer
+ */
+public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? 
extends T>>>
+               extends BufferBuilder<PushStream<T>,T,U> {
+
+       /**
+        * Tells this {@link PushStreamBuilder} to create an unbuffered stream 
which
+        * delivers events directly to its consumer using the incoming delivery
+        * thread.
+        * 
+        * @return the builder
+        */
+       PushStreamBuilder<T,U> unbuffered();
+
+       /*
+        * Overridden methods to allow the covariant return of a 
PushStreamBuilder
+        */
+
+       @Override
+       PushStreamBuilder<T,U> withBuffer(U queue);
+
+       @Override
+       PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
+
+       @Override
+       PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption 
queuePolicyOption);
+
+       @Override
+       PushStreamBuilder<T,U> withPushbackPolicy(
+                       PushbackPolicy<T,U> pushbackPolicy);
+
+       @Override
+       PushStreamBuilder<T,U> withPushbackPolicy(
+                       PushbackPolicyOption pushbackPolicyOption, long time);
+
+       @Override
+       PushStreamBuilder<T,U> withParallelism(int parallelism);
+
+       @Override
+       PushStreamBuilder<T,U> withExecutor(Executor executor);
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+
+class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends 
T>>>
+               extends AbstractBufferBuilder<PushStream<T>,T,U>
+               implements PushStreamBuilder<T,U> {
+
+       private final PushStreamProvider        psp;
+       private final PushEventSource<T>                eventSource;
+       private final Executor                                  
previousExecutor;
+
+       private boolean                                                 
unbuffered;
+
+       PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
+                       PushEventSource<T> eventSource) {
+               this.psp = psp;
+               this.previousExecutor = defaultExecutor;
+               this.eventSource = eventSource;
+               this.worker = defaultExecutor;
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withBuffer(U queue) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) super.withBuffer(queue);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withQueuePolicy(
+                       QueuePolicy<T,U> queuePolicy) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) 
super.withQueuePolicy(queuePolicy);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withQueuePolicy(
+                       QueuePolicyOption queuePolicyOption) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) super.withQueuePolicy(
+                               queuePolicyOption);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withPushbackPolicy(
+                       PushbackPolicy<T,U> pushbackPolicy) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+                               pushbackPolicy);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withPushbackPolicy(
+                       PushbackPolicyOption pushbackPolicyOption, long time) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
+                               pushbackPolicyOption, time);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withParallelism(int parallelism) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) 
super.withParallelism(parallelism);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> withExecutor(Executor executor) {
+               unbuffered = false;
+               return (PushStreamBuilder<T,U>) super.withExecutor(executor);
+       }
+
+       @Override
+       public PushStreamBuilder<T,U> unbuffered() {
+               unbuffered = true;
+               return this;
+       }
+
+       @Override
+       public PushStream<T> create() {
+               if (unbuffered) {
+                       return psp.createUnbufferedStream(eventSource, 
previousExecutor);
+               } else {
+                       return psp.createStream(eventSource, concurrency, 
worker, buffer,
+                                       bufferingPolicy, backPressure);
+               }
+       }
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.CLOSED;
+import static org.osgi.util.pushstream.PushEvent.data;
+import static org.osgi.util.pushstream.PushEvent.error;
+import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
+import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.apache.aries.pushstream.BufferedPushStreamImpl;
+import org.apache.aries.pushstream.SimplePushEventSourceImpl;
+import org.apache.aries.pushstream.UnbufferedPushStreamImpl;
+
+/**
+ * A factory for {@link PushStream} instances, and utility methods for handling
+ * {@link PushEventSource}s and {@link PushEventConsumer}s
+ */
+public final class PushStreamProvider {
+
+       private final Lock                                      lock    = new 
ReentrantLock(true);
+
+       private int                                                     
schedulerReferences;
+
+       private ScheduledExecutorService        scheduler;
+
+       private ScheduledExecutorService acquireScheduler() {
+               try {
+                       lock.lockInterruptibly();
+                       try {
+                               schedulerReferences += 1;
+
+                               if (schedulerReferences == 1) {
+                                       scheduler = 
Executors.newSingleThreadScheduledExecutor();
+                               }
+                               return scheduler;
+                       } finally {
+                               lock.unlock();
+                       }
+               } catch (InterruptedException e) {
+                       throw new IllegalStateException("Unable to acquire the 
Scheduler",
+                                       e);
+               }
+       }
+
+       private void releaseScheduler() {
+               try {
+                       lock.lockInterruptibly();
+                       try {
+                               schedulerReferences -= 1;
+
+                               if (schedulerReferences == 0) {
+                                       scheduler.shutdown();
+                                       scheduler = null;
+                               }
+                       } finally {
+                               lock.unlock();
+                       }
+               } catch (InterruptedException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+       }
+
+       /**
+        * Create a stream with the default configured buffer, executor size, 
queue,
+        * queue policy and pushback policy. This is equivalent to calling
+        * 
+        * <code>
+        *   buildStream(source).create();
+        * </code>
+        * 
+        * <p>
+        * This stream will be buffered from the event producer, and will honour
+        * back pressure even if the source does not.
+        * 
+        * <p>
+        * Buffered streams are useful for "bursty" event sources which produce 
a
+        * number of events close together, then none for some time. These 
bursts
+        * can sometimes overwhelm downstream processors. Buffering will not,
+        * however, protect downstream components from a source which produces
+        * events faster (on average) than they can be consumed.
+        * 
+        * <p>
+        * Event delivery will not begin until a terminal operation is reached 
on
+        * the chain of AsyncStreams. Once a terminal operation is reached the
+        * stream will be connected to the event source.
+        * 
+        * @param eventSource
+        * @return A {@link PushStream} with a default initial buffer
+        */
+       public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
+               return createStream(eventSource, 1, null, new 
ArrayBlockingQueue<>(32),
+                               FAIL.getPolicy(), LINEAR.getPolicy(1000));
+       }
+       
+       /**
+        * Builds a push stream with custom configuration.
+        * 
+        * <p>
+        * 
+        * The resulting {@link PushStream} may be buffered or unbuffered 
depending
+        * on how it is configured.
+        * 
+        * @param eventSource The source of the events
+        * 
+        * @return A {@link PushStreamBuilder} for the stream
+        */
+       public <T, U extends BlockingQueue<PushEvent< ? extends T>>> 
PushStreamBuilder<T,U> buildStream(
+                       PushEventSource<T> eventSource) {
+               return new PushStreamBuilderImpl<T,U>(this, null, eventSource);
+       }
+       
+       @SuppressWarnings({
+                       "rawtypes", "unchecked"
+       })
+       <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> 
createStream(
+                       PushEventSource<T> eventSource, int parallelism, 
Executor executor,
+                       U queue, QueuePolicy<T,U> queuePolicy,
+                       PushbackPolicy<T,U> pushbackPolicy) {
+
+               if (eventSource == null) {
+                       throw new NullPointerException("There is no source of 
events");
+               }
+
+               if (parallelism < 0) {
+                       throw new IllegalArgumentException(
+                                       "The supplied parallelism cannot be 
less than zero. It was "
+                                                       + parallelism);
+               } else if (parallelism == 0) {
+                       parallelism = 1;
+               }
+
+               boolean closeExecutorOnClose;
+               Executor toUse;
+               if (executor == null) {
+                       toUse = Executors.newFixedThreadPool(parallelism);
+                       closeExecutorOnClose = true;
+               } else {
+                       toUse = executor;
+                       closeExecutorOnClose = false;
+               }
+
+               if (queue == null) {
+                       queue = (U) new ArrayBlockingQueue(32);
+               }
+
+               if (queuePolicy == null) {
+                       queuePolicy = FAIL.getPolicy();
+               }
+
+               if (pushbackPolicy == null) {
+                       pushbackPolicy = LINEAR.getPolicy(1000);
+               }
+
+               @SuppressWarnings("resource")
+               PushStream<T> stream = new BufferedPushStreamImpl<>(this,
+                               acquireScheduler(), queue, parallelism, toUse, 
queuePolicy,
+                               pushbackPolicy, aec -> {
+                                       try {
+                                               return eventSource.open(aec);
+                                       } catch (Exception e) {
+                                               throw new RuntimeException(
+                                                               "Unable to 
connect to event source", e);
+                                       }
+                               });
+
+               stream = stream.onClose(() -> {
+                       if (closeExecutorOnClose) {
+                               ((ExecutorService) toUse).shutdown();
+                       }
+                       releaseScheduler();
+               }).map(Function.identity());
+               return stream;
+       }
+
+       <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
+                       Executor executor) {
+
+               boolean closeExecutorOnClose;
+               Executor toUse;
+               if (executor == null) {
+                       toUse = Executors.newFixedThreadPool(2);
+                       closeExecutorOnClose = true;
+               } else {
+                       toUse = executor;
+                       closeExecutorOnClose = false;
+               }
+
+               @SuppressWarnings("resource")
+               PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, 
toUse,
+                               acquireScheduler(), aec -> {
+                                       try {
+                                               return eventSource.open(aec);
+                                       } catch (Exception e) {
+                                               throw new RuntimeException(
+                                                               "Unable to 
connect to event source", e);
+                                       }
+                               });
+
+               stream = stream.onClose(() -> {
+                       if (closeExecutorOnClose) {
+                               ((ExecutorService) toUse).shutdown();
+                       }
+                       releaseScheduler();
+               }).map(Function.identity());
+
+               return stream;
+       }
+
+       /**
+        * Convert an {@link PushStream} into an {@link PushEventSource}. The 
first
+        * call to {@link PushEventSource#open(PushEventConsumer)} will begin 
event
+        * processing.
+        * 
+        * The {@link PushEventSource} will remain active until the backing 
stream
+        * is closed, and permits multiple consumers to
+        * {@link PushEventSource#open(PushEventConsumer)} it.
+        * 
+        * This is equivalent to: <code>
+        *   buildEventSourceFromStream(stream).create();
+        * </code>
+        * 
+        * @param stream
+        * @return a {@link PushEventSource} backed by the {@link PushStream}
+        */
+       public <T> PushEventSource<T> createEventSourceFromStream(
+                       PushStream<T> stream) {
+               return buildEventSourceFromStream(stream).create();
+       }
+
+       /**
+        * Convert an {@link PushStream} into an {@link PushEventSource}. The 
first
+        * call to {@link PushEventSource#open(PushEventConsumer)} will begin 
event
+        * processing.
+        * 
+        * The {@link PushEventSource} will remain active until the backing 
stream
+        * is closed, and permits multiple consumers to
+        * {@link PushEventSource#open(PushEventConsumer)} it.
+        * 
+        * @param stream
+        * 
+        * @return a {@link PushEventSource} backed by the {@link PushStream}
+        */
+       public <T, U extends BlockingQueue<PushEvent< ? extends T>>> 
BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
+                       PushStream<T> stream) {
+               return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
+                       @Override
+                       public PushEventSource<T> create() {
+                               SimplePushEventSource<T> spes = 
createSimplePushEventSource(
+                                               concurrency, worker, buffer, 
bufferingPolicy, () -> {
+                                                       try {
+                                                               stream.close();
+                                                       } catch (Exception e) {
+                                                               // TODO 
Auto-generated catch block
+                                                               
e.printStackTrace();
+                                                       }
+                                               });
+                               spes.connectPromise()
+                                               .then(p -> stream.forEach(t -> 
spes.publish(t))
+                                                               .onResolve(() 
-> spes.close()));
+                               return spes;
+                       }
+               };
+       }
+       
+
+       /**
+        * Create a {@link SimplePushEventSource} with the supplied type and 
default
+        * buffering behaviours. The SimplePushEventSource will respond to back
+        * pressure requests from the consumers connected to it.
+        * 
+        * This is equivalent to: <code>
+        *   buildSimpleEventSource(type).create();
+        * </code>
+        * 
+        * @param type
+        * @return a {@link SimplePushEventSource}
+        */
+       public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> 
type) {
+               return createSimplePushEventSource(1, null,
+                               new ArrayBlockingQueue<>(32),
+                               FAIL.getPolicy(), () -> { /* Nothing else to do 
*/ });
+       }
+       
+       /**
+        * 
+        * Build a {@link SimplePushEventSource} with the supplied type and 
custom
+        * buffering behaviours. The SimplePushEventSource will respond to back
+        * pressure requests from the consumers connected to it.
+        * 
+        * @param type
+        * 
+        * @return a {@link SimplePushEventSource}
+        */
+
+       public <T, U extends BlockingQueue<PushEvent< ? extends T>>> 
BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource(
+                       Class<T> type) {
+               return new 
AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
+                       @Override
+                       public SimplePushEventSource<T> create() {
+                               return createSimplePushEventSource(concurrency, 
worker, buffer,
+                                               bufferingPolicy, () -> { /* 
Nothing else to do */ });
+                       }
+               };
+       }
+       
+       @SuppressWarnings({
+                       "unchecked", "rawtypes"
+       })
+       <T, U extends BlockingQueue<PushEvent< ? extends T>>> 
SimplePushEventSource<T> createSimplePushEventSource(
+                       int parallelism, Executor executor, U queue,
+                       QueuePolicy<T,U> queuePolicy, Runnable onClose) {
+
+               if (parallelism < 0) {
+                       throw new IllegalArgumentException(
+                                       "The supplied parallelism cannot be 
less than zero. It was "
+                                                       + parallelism);
+               } else if (parallelism == 0) {
+                       parallelism = 1;
+               }
+
+               boolean closeExecutorOnClose;
+               Executor toUse;
+               if (executor == null) {
+                       toUse = Executors.newFixedThreadPool(2);
+                       closeExecutorOnClose = true;
+               } else {
+                       toUse = executor;
+                       closeExecutorOnClose = false;
+               }
+
+               if (queue == null) {
+                       queue = (U) new ArrayBlockingQueue(32);
+               }
+
+               if (queuePolicy == null) {
+                       queuePolicy = FAIL.getPolicy();
+               }
+
+               SimplePushEventSourceImpl<T,U> spes = new 
SimplePushEventSourceImpl<T,U>(
+                               toUse, acquireScheduler(), queuePolicy, queue, 
parallelism,
+                               () -> {
+                                       try {
+                                               onClose.run();
+                                       } catch (Exception e) {
+                                               // TODO log this?
+                                       }
+                                       if (closeExecutorOnClose) {
+                                               ((ExecutorService) 
toUse).shutdown();
+                                       }
+                                       releaseScheduler();
+                               });
+               return spes;
+       }
+
+       /**
+        * Create a buffered {@link PushEventConsumer} with the default 
configured
+        * buffer, executor size, queue, queue policy and pushback policy. This 
is
+        * equivalent to calling
+        * 
+        * <code>
+        *   buildBufferedConsumer(delegate).create();
+        * </code>
+        * 
+        * <p>
+        * The returned consumer will be buffered from the event source, and 
will
+        * honour back pressure requests from its delegate even if the event 
source
+        * does not.
+        * 
+        * <p>
+        * Buffered consumers are useful for "bursty" event sources which 
produce a
+        * number of events close together, then none for some time. These 
bursts
+        * can sometimes overwhelm the consumer. Buffering will not, however,
+        * protect downstream components from a source which produces events 
faster
+        * than they can be consumed.
+        * 
+        * @param delegate
+        * @return a {@link PushEventConsumer} with a buffer directly before it
+        */
+       public <T> PushEventConsumer<T> createBufferedConsumer(
+                       PushEventConsumer<T> delegate) {
+               return buildBufferedConsumer(delegate).create();
+       }
+       
+       /**
+        * Build a buffered {@link PushEventConsumer} with custom configuration.
+        * <p>
+        * The returned consumer will be buffered from the event source, and 
will
+        * honour back pressure requests from its delegate even if the event 
source
+        * does not.
+        * <p>
+        * Buffered consumers are useful for "bursty" event sources which 
produce a
+        * number of events close together, then none for some time. These 
bursts
+        * can sometimes overwhelm the consumer. Buffering will not, however,
+        * protect downstream components from a source which produces events 
faster
+        * than they can be consumed.
+        * <p>
+        * Buffers are also useful as "circuit breakers". If a
+        * {@link QueuePolicyOption#FAIL} is used then a full buffer will 
request
+        * that the stream close, preventing an event storm from reaching the
+        * client.
+        * <p>
+        * Note that this buffered consumer will close when it receives a 
terminal
+        * event, or if the delegate returns negative backpressure. No further
+        * events will be propagated after this time.
+        * 
+        * @param delegate
+        * @return a {@link PushEventConsumer} with a buffer directly before it
+        */
+       public <T, U extends BlockingQueue<PushEvent< ? extends T>>> 
BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer(
+                       PushEventConsumer<T> delegate) {
+               return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
+                       @Override
+                       public PushEventConsumer<T> create() {
+                               PushEventPipe<T> pipe = new PushEventPipe<>();
+                               
+                               createStream(pipe, concurrency, worker, buffer, 
bufferingPolicy, backPressure)
+                                       .forEachEvent(delegate);
+                               
+                               return pipe;
+                       }
+               };
+       }
+
+       static final class PushEventPipe<T>
+                       implements PushEventConsumer<T>, PushEventSource<T> {
+
+               volatile PushEventConsumer< ? super T> delegate;
+
+               @Override
+               public AutoCloseable open(PushEventConsumer< ? super T> pec)
+                               throws Exception {
+                       return () -> { /* Nothing else to do */ };
+               }
+
+               @Override
+               public long accept(PushEvent< ? extends T> event) throws 
Exception {
+                       return delegate.accept(event);
+               }
+
+       }
+
+       /**
+        * Create an Unbuffered {@link PushStream} from a Java {@link Stream} 
The
+        * data from the stream will be pushed into the PushStream 
synchronously as
+        * it is opened. This may make terminal operations blocking unless a 
buffer
+        * has been added to the {@link PushStream}. Care should be taken with
+        * infinite {@link Stream}s to avoid blocking indefinitely.
+        * 
+        * @param items The items to push into the PushStream
+        * @return A PushStream containing the items from the Java Stream
+        */
+       public <T> PushStream<T> streamOf(Stream<T> items) {
+               PushEventSource<T> pes = aec -> {
+                       AtomicBoolean closed = new AtomicBoolean(false);
+
+                       items.mapToLong(i -> {
+                               try {
+                                       long returnValue = closed.get() ? -1 : 
aec.accept(data(i));
+                                       if (returnValue < 0) {
+                                               
aec.accept(PushEvent.<T>close());
+                                       }
+                                       return returnValue;
+                               } catch (Exception e) {
+                                       try {
+                                               
aec.accept(PushEvent.<T>error(e));
+                                       } catch (Exception e2) {/* No further 
events needed */}
+                                       return -1;
+                               }
+                       }).filter(i -> i < 0).findFirst().orElseGet(() -> {
+                               try {
+                                       return aec.accept(PushEvent.<T>close());
+                               } catch (Exception e) {
+                                       return -1;
+                               }
+                       });
+
+                       return () -> closed.set(true);
+               };
+
+               return this.<T> createUnbufferedStream(pes, null);
+       }
+
+       /**
+        * Create an Unbuffered {@link PushStream} from a Java {@link Stream} 
The
+        * data from the stream will be pushed into the PushStream 
asynchronously
+        * using the supplied Executor.
+        * 
+        * @param executor The worker to use to push items from the Stream into 
the
+        *            PushStream
+        * @param items The items to push into the PushStream
+        * @return A PushStream containing the items from the Java Stream
+        */
+       public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) {
+
+               boolean closeExecutorOnClose;
+               Executor toUse;
+               if (executor == null) {
+                       toUse = Executors.newFixedThreadPool(2);
+                       closeExecutorOnClose = true;
+               } else {
+                       toUse = executor;
+                       closeExecutorOnClose = false;
+               }
+
+               @SuppressWarnings("resource")
+               PushStream<T> stream = new 
UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
+                               this, toUse, acquireScheduler(), aec -> {
+                                       return () -> { /* No action to take */ 
};
+                               }) {
+
+                       @Override
+                       protected boolean begin() {
+                               if (super.begin()) {
+                                       Iterator<T> it = items.iterator();
+
+                                       toUse.execute(() -> pushData(it));
+
+                                       return true;
+                               }
+                               return false;
+                       }
+
+                       private void pushData(Iterator<T> it) {
+                               while (it.hasNext()) {
+                                       try {
+                                               long returnValue = closed.get() 
== CLOSED ? -1
+                                                               : 
handleEvent(data(it.next()));
+                                               if (returnValue != 0) {
+                                                       if (returnValue < 0) {
+                                                               close();
+                                                               return;
+                                                       } else {
+                                                               
scheduler.schedule(
+                                                                               
() -> toUse.execute(() -> pushData(it)),
+                                                                               
returnValue, MILLISECONDS);
+                                                               return;
+                                                       }
+                                               }
+                                       } catch (Exception e) {
+                                               close(error(e));
+                                       }
+                               }
+                               close();
+                       }
+               };
+
+               stream = stream.onClose(() -> {
+                       if (closeExecutorOnClose) {
+                               ((ExecutorService) toUse).shutdown();
+                       }
+                       releaseScheduler();
+               }).map(Function.identity());
+
+               return stream;
+       }
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import org.osgi.annotation.versioning.ConsumerType;
+
+/**
+ * A {@link PushbackPolicy} is used to calculate how much back pressure to 
apply
+ * based on the current buffer. The {@link PushbackPolicy} will be called after
+ * an event has been queued, and the returned value will be used as back
+ * pressure.
+ * 
+ * @see PushbackPolicyOption
+ * 
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+@ConsumerType
+@FunctionalInterface
+public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends 
T>>> {
+       
+       /**
+        * Given the current state of the queue, determine the level of back
+        * pressure that should be applied
+        * 
+        * @param queue
+        * @return a back pressure value in nanoseconds
+        * @throws Exception
+        */
+       public long pushback(U queue) throws Exception;
+       
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * {@link PushbackPolicyOption} provides a standard set of simple
+ * {@link PushbackPolicy} implementations.
+ * 
+ * @see PushbackPolicy
+ */
+public enum PushbackPolicyOption {
+
+       /**
+        * Returns a fixed amount of back pressure, independent of how full the
+        * buffer is
+        */
+       FIXED {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
PushbackPolicy<T, U> getPolicy(long value) {
+                       return q -> value;
+               }
+       },
+       /**
+        * Returns zero back pressure until the buffer is full, then it returns 
a
+        * fixed value
+        */
+       ON_FULL_FIXED {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
PushbackPolicy<T, U> getPolicy(long value) {
+                       return q -> q.remainingCapacity() == 0 ? value : 0;
+               }
+       },
+       /**
+        * Returns zero back pressure until the buffer is full, then it returns 
an
+        * exponentially increasing amount, starting with the supplied value and
+        * doubling it each time. Once the buffer is no longer full the back
+        * pressure returns to zero.
+        */
+       ON_FULL_EXPONENTIAL {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
PushbackPolicy<T, U> getPolicy(long value) {
+                       AtomicInteger backoffCount = new AtomicInteger(0);
+                       return q -> {
+                               if (q.remainingCapacity() == 0) {
+                                       return value << 
backoffCount.getAndIncrement();
+                               }
+                               backoffCount.set(0);
+                               return 0;
+                       };
+
+               }
+       },
+       /**
+        * Returns zero back pressure when the buffer is empty, then it returns 
a
+        * linearly increasing amount of back pressure based on how full the 
buffer
+        * is. The maximum value will be returned when the buffer is full.
+        */
+       LINEAR {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
PushbackPolicy<T, U> getPolicy(long value) {
+                       return q -> {
+                               long remainingCapacity = q.remainingCapacity();
+                               long used = q.size();
+                               return (value * used) / (used + 
remainingCapacity);
+                       };
+               }
+       };
+
+       /**
+        * Create a {@link PushbackPolicy} instance configured with a base back
+        * pressure time in nanoseconds
+        * 
+        * The actual backpressure returned will vary based on the selected
+        * implementation, the base value, and the state of the buffer.
+        * 
+        * @param value
+        * @return A {@link PushbackPolicy} to use
+        */
+       public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> 
PushbackPolicy<T, U> getPolicy(long value);
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.osgi.annotation.versioning.ConsumerType;
+import org.osgi.util.pushstream.PushEvent.EventType;
+
+/**
+ * A {@link QueuePolicy} is used to control how events should be queued in the
+ * current buffer. The {@link QueuePolicy} will be called when an event has
+ * arrived.
+ * 
+ * @see QueuePolicyOption
+ * 
+ *
+ * @param <T> The type of the data
+ * @param <U> The type of the queue
+ */
+
+@ConsumerType
+@FunctionalInterface
+public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends 
T>>> { 
+       
+       /**
+        * Enqueue the event and return the remaining capacity available for 
events
+        * 
+        * @param queue
+        * @param event
+        * @throws Exception If an error ocurred adding the event to the queue. 
This
+        *         exception will cause the connection between the
+        *         {@link PushEventSource} and the {@link PushEventConsumer} to 
be
+        *         closed with an {@link EventType#ERROR}
+        */
+       public void doOffer(U queue, PushEvent<? extends T> event) throws 
Exception;
+       
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * {@link QueuePolicyOption} provides a standard set of simple
+ * {@link QueuePolicy} implementations.
+ * 
+ * @see QueuePolicy
+ */
+public enum QueuePolicyOption {
+       /**
+        * Attempt to add the supplied event to the queue. If the queue is 
unable to
+        * immediately accept the value then discard the value at the head of 
the
+        * queue and try again. Repeat this process until the event is enqueued.
+        */
+       DISCARD_OLDEST {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
QueuePolicy<T, U> getPolicy() {
+                       return (queue, event) -> {
+                               while (!queue.offer(event)) {
+                                       queue.poll();
+                               }
+                       };
+               }
+       },
+       /**
+        * Attempt to add the supplied event to the queue, blocking until the
+        * enqueue is successful.
+        */
+       BLOCK {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
QueuePolicy<T, U> getPolicy() {
+                       return (queue, event) -> {
+                               try {
+                                       queue.put(event);
+                               } catch (InterruptedException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       };
+               }
+       },
+       /**
+        * Attempt to add the supplied event to the queue, throwing an 
exception if
+        * the queue is full.
+        */
+       FAIL {
+               @Override
+               public <T, U extends BlockingQueue<PushEvent<? extends T>>> 
QueuePolicy<T, U> getPolicy() {
+                       return (queue, event) -> queue.add(event);
+               }
+       };
+
+       /**
+        * @return a {@link QueuePolicy} implementation
+        */
+       public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> 
QueuePolicy<T, U> getPolicy();
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.promise.Promise;
+
+/**
+ * A {@link SimplePushEventSource} is a helper that makes it simpler to write a
+ * {@link PushEventSource}. Users do not need to manage multiple registrations
+ * to the stream, nor do they have to be concerned with back pressure.
+ *
+ * @param <T> The type of the events produced by this source
+ */
+@ProviderType
+public interface SimplePushEventSource<T>
+               extends PushEventSource<T>, AutoCloseable {
+       /**
+        * Close this source. Calling this method indicates that there will 
never be
+        * any more events published by it. Calling this method sends a close 
event
+        * to all connected consumers. After calling this method any
+        * {@link PushEventConsumer} that tries to {@link 
#open(PushEventConsumer)}
+        * this source will immediately receive a close event.
+        */
+       @Override
+       void close();
+
+       /**
+        * Asynchronously publish an event to this stream and all connected
+        * {@link PushEventConsumer} instances. When this method returns there 
is no
+        * guarantee that all consumers have been notified. Events published by 
a
+        * single thread will maintain their relative ordering, however they 
may be
+        * interleaved with events from other threads.
+        * 
+        * @param t
+        * @throws IllegalStateException if the source is closed
+        */
+       void publish(T t);
+
+       /**
+        * Close this source for now, but potentially reopen it later. Calling 
this
+        * method asynchronously sends a close event to all connected consumers.
+        * After calling this method any {@link PushEventConsumer} that wishes 
may
+        * {@link #open(PushEventConsumer)} this source, and will receive 
subsequent
+        * events.
+        */
+       void endOfStream();
+
+       /**
+        * Close this source for now, but potentially reopen it later. Calling 
this
+        * method asynchronously sends an error event to all connected 
consumers.
+        * After calling this method any {@link PushEventConsumer} that wishes 
may
+        * {@link #open(PushEventConsumer)} this source, and will receive 
subsequent
+        * events.
+        *
+        * @param e the error
+        */
+       void error(Exception e);
+
+       /**
+        * Determine whether there are any {@link PushEventConsumer}s for this
+        * {@link PushEventSource}. This can be used to skip expensive event
+        * creation logic when there are no listeners.
+        * 
+        * @return true if any consumers are currently connected
+        */
+       boolean isConnected();
+
+       /**
+        * This method can be used to delay event generation until an event 
source
+        * has connected. The returned promise will resolve as soon as one or 
more
+        * {@link PushEventConsumer} instances have opened the
+        * SimplePushEventSource.
+        * <p>
+        * The returned promise may already be resolved if this
+        * {@link SimplePushEventSource} already has connected consumers. If the
+        * {@link SimplePushEventSource} is closed before the returned Promise
+        * resolves then it will be failed with an {@link 
IllegalStateException}.
+        * <p>
+        * Note that the connected consumers are able to asynchronously close 
their
+        * connections to this {@link SimplePushEventSource}, and therefore it 
is
+        * possible that once the promise resolves this
+        * {@link SimplePushEventSource} may no longer be connected to any
+        * consumers.
+        * 
+        * @return A promise representing the connection state of this 
EventSource
+        */
+       Promise<Void> connectPromise();
+
+}

Added: 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java?rev=1766040&view=auto
==============================================================================
--- 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
 (added)
+++ 
aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java
 Fri Oct 21 15:10:51 2016
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * Push Stream Package Version 1.0.
+ * 
+ * <p>
+ * Bundles wishing to use this package must list the package in the
+ * Import-Package header of the bundle's manifest.
+ * 
+ * <p>
+ * Example import for consumers using the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"}
+ * <p>
+ * Example import for providers implementing the API in this package:
+ * <p>
+ * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"}
+ * 
+ * @author $Id: 6a28fa0b5c2036486a22a7ca1254729d7848ca43 $
+ */
+
+@Version("1.0")
+package org.osgi.util.pushstream;
+
+import org.osgi.annotation.versioning.Version;


Reply via email to