Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStream.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,586 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.time.Duration; +import java.util.Collection; +import java.util.Comparator; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collector; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.promise.Promise; + +/** + * A Push Stream fulfills the same role as the Java 8 stream but it reverses the + * control direction. The Java 8 stream is pull based and this is push based. A + * Push Stream makes it possible to build a pipeline of transformations using a + * builder kind of model. Just like streams, it provides a number of terminating + * methods that will actually open the channel and perform the processing until + * the channel is closed (The source sends a Close event). The results of the + * processing will be send to a Promise, just like any error events. A stream + * can be used multiple times. The Push Stream represents a pipeline. Upstream + * is in the direction of the source, downstream is in the direction of the + * terminating method. Events are sent downstream asynchronously with no + * guarantee for ordering or concurrency. Methods are available to provide + * serialization of the events and splitting in background threads. + * + * @param <T> The Payload type + */ +@ProviderType +public interface PushStream<T> extends AutoCloseable { + + /** + * Must be run after the channel is closed. This handler will run after the + * downstream methods have processed the close event and before the upstream + * methods have closed. + * + * @param closeHandler Will be called on close + * @return This stream + */ + PushStream<T> onClose(Runnable closeHandler); + + /** + * Must be run after the channel is closed. This handler will run after the + * downstream methods have processed the close event and before the upstream + * methods have closed. + * + * @param closeHandler Will be called on close + * @return This stream + */ + PushStream<T> onError(Consumer< ? super Throwable> closeHandler); + + /** + * Only pass events downstream when the predicate tests true. + * + * @param predicate The predicate that is tested (not null) + * @return Builder style (can be a new or the same object) + */ + PushStream<T> filter(Predicate< ? super T> predicate); + + /** + * Map a payload value. + * + * @param mapper The map function + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> map(Function< ? super T, ? extends R> mapper); + + /** + * Flat map the payload value (turn one event into 0..n events of + * potentially another type). + * + * @param mapper The flat map function + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> flatMap( + Function< ? super T, ? extends PushStream< ? extends R>> mapper); + + /** + * Remove any duplicates. Notice that this can be expensive in a large + * stream since it must track previous payloads. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> distinct(); + + /** + * Sorted the elements, assuming that T extends Comparable. This is of + * course expensive for large or infinite streams since it requires + * buffering the stream until close. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> sorted(); + + /** + * Sorted the elements with the given comparator. This is of course + * expensive for large or infinite streams since it requires buffering the + * stream until close. + * + * @param comparator + * @return Builder style (can be a new or the same object) + */ + PushStream<T> sorted(Comparator< ? super T> comparator); + + /** + * Automatically close the channel after the maxSize number of elements is + * received. + * + * @param maxSize Maximum number of elements has been received + * @return Builder style (can be a new or the same object) + */ + PushStream<T> limit(long maxSize); + + /** + * Skip a number of events in the channel. + * + * @param n number of elements to skip + * @return Builder style (can be a new or the same object) + */ + PushStream<T> skip(long n); + + /** + * Execute the downstream events in up to n background threads. If more + * requests are outstanding apply delay * nr of delayed threads back + * pressure. A downstream channel that is closed or throws an exception will + * cause all execution to cease and the stream to close + * + * @param n number of simultaneous background threads to use + * @param delay Nr of ms/thread that is queued back pressure + * @param e an executor to use for the background threads. + * @return Builder style (can be a new or the same object) + * @throws IllegalArgumentException if the number of threads is < 1 or the + * delay is < 0 + * @throws NullPointerException if the Executor is null + */ + PushStream<T> fork(int n, int delay, Executor e) + throws IllegalArgumentException, NullPointerException; + + /** + * Buffer the events in a queue using default values for the queue size and + * other behaviours. Buffered work will be processed asynchronously in the + * rest of the chain. Buffering also blocks the transmission of back + * pressure to previous elements in the chain, although back pressure is + * honoured by the buffer. + * <p> + * Buffers are useful for "bursty" event sources which produce a number of + * events close together, then none for some time. These bursts can + * sometimes overwhelm downstream event consumers. Buffering will not, + * however, protect downstream components from a source which produces + * events faster than they can be consumed. For fast sources + * {@link #filter(Predicate)} and {@link #coalesce(int, Function)} + * {@link #fork(int, int, Executor)} are better choices. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> buffer(); + + /** + * Build a buffer to enqueue events in a queue using custom values for the + * queue size and other behaviours. Buffered work will be processed + * asynchronously in the rest of the chain. Buffering also blocks the + * transmission of back pressure to previous elements in the chain, although + * back pressure is honoured by the buffer. + * <p> + * Buffers are useful for "bursty" event sources which produce a number of + * events close together, then none for some time. These bursts can + * sometimes overwhelm downstream event consumers. Buffering will not, + * however, protect downstream components from a source which produces + * events faster than they can be consumed. For fast sources + * {@link #filter(Predicate)} and {@link #coalesce(int, Function)} + * {@link #fork(int, int, Executor)} are better choices. + * <p> + * Buffers are also useful as "circuit breakers" in the pipeline. If a + * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger + * the stream to close, preventing an event storm from reaching the client. + * + * @param parallelism + * @param executor + * @param queue + * @param queuePolicy + * @param pushbackPolicy + * @return Builder style (can be a new or the same object) + */ + <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer(); + + /** + * Merge in the events from another source. The resulting channel is not + * closed until this channel and the channel from the source are closed. + * + * @param source The source to merge in. + * @return Builder style (can be a new or the same object) + */ + PushStream<T> merge(PushEventSource< ? extends T> source); + + /** + * Merge in the events from another PushStream. The resulting channel is not + * closed until this channel and the channel from the source are closed. + * + * @param source The source to merge in. + * @return Builder style (can be a new or the same object) + */ + PushStream<T> merge(PushStream< ? extends T> source); + + /** + * Split the events to different streams based on a predicate. If the + * predicate is true, the event is dispatched to that channel on the same + * position. All predicates are tested for every event. + * <p> + * This method differs from other methods of AsyncStream in three + * significant ways: + * <ul> + * <li>The return value contains multiple streams.</li> + * <li>This stream will only close when all of these child streams have + * closed.</li> + * <li>Event delivery is made to all open children that accept the event. + * </li> + * </ul> + * + * @param predicates the predicates to test + * @return streams that map to the predicates + */ + @SuppressWarnings("unchecked") + PushStream<T>[] split(Predicate< ? super T>... predicates); + + /** + * Ensure that any events are delivered sequentially. That is, no + * overlapping calls downstream. This can be used to turn a forked stream + * (where for example a heavy conversion is done in multiple threads) back + * into a sequential stream so a reduce is simple to do. + * + * @return Builder style (can be a new or the same object) + */ + PushStream<T> sequential(); + + /** + * Coalesces a number of events into a new type of event. The input events + * are forwarded to a accumulator function. This function returns an + * Optional. If the optional is present, it's value is send downstream, + * otherwise it is ignored. + * + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f); + + /** + * Coalesces a number of events into a new type of event. A fixed number of + * input events are forwarded to a accumulator function. This function + * returns new event data to be forwarded on. + * + * @param count + * @param f + * @return Builder style (can be a new or the same object) + */ + public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f); + + /** + * Coalesces a number of events into a new type of event. A variable number + * of input events are forwarded to a accumulator function. The number of + * events to be forwarded is determined by calling the count function. The + * accumulator function then returns new event data to be forwarded on. + * + * @param count + * @param f + * @return Builder style (can be a new or the same object) + */ + public <R> PushStream<R> coalesce(IntSupplier count, + Function<Collection<T>,R> f); + + /** + * Buffers a number of events over a fixed time interval and then forwards + * the events to an accumulator function. This function returns new event + * data to be forwarded on. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered as a different task, (and therefore potentially on a different + * thread) from the one that delivered the event to this {@link PushStream}. + * </li> + * <li>Due to the buffering and asynchronous delivery required, this method + * prevents the propagation of back-pressure to earlier stages</li> + * </ul> + * + * @param d + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f); + + /** + * Buffers a number of events over a fixed time interval and then forwards + * the events to an accumulator function. This function returns new event + * data to be forwarded on. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered by a task given to the supplied executor.</li> + * <li>Due to the buffering and asynchronous delivery required, this method + * prevents the propagation of back-pressure to earlier stages</li> + * </ul> + * + * @param d + * @param executor + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Duration d, Executor executor, + Function<Collection<T>,R> f); + + /** + * Buffers a number of events over a variable time interval and then + * forwards the events to an accumulator function. The length of time over + * which events are buffered is determined by the time function. A maximum + * number of events can also be requested, if this number of events is + * reached then the accumulator will be called early. The accumulator + * function returns new event data to be forwarded on. It is also given the + * length of time for which the buffer accumulated data. This may be less + * than the requested interval if the buffer reached the maximum number of + * requested events early. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered as a different task, (and therefore potentially on a different + * thread) from the one that delivered the event to this {@link PushStream}. + * </li> + * <li>Due to the buffering and asynchronous delivery required, this method + * prevents the propagation of back-pressure to earlier stages</li> + * <li>If the window finishes by hitting the maximum number of events then + * the remaining time in the window will be applied as back-pressure to the + * previous stage, attempting to slow the producer to the expected windowing + * threshold.</li> + * </ul> + * + * @param timeSupplier + * @param maxEvents + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Supplier<Duration> timeSupplier, + IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f); + + /** + * Buffers a number of events over a variable time interval and then + * forwards the events to an accumulator function. The length of time over + * which events are buffered is determined by the time function. A maximum + * number of events can also be requested, if this number of events is + * reached then the accumulator will be called early. The accumulator + * function returns new event data to be forwarded on. It is also given the + * length of time for which the buffer accumulated data. This may be less + * than the requested interval if the buffer reached the maximum number of + * requested events early. Note that: + * <ul> + * <li>The collection forwarded to the accumulator function will be empty if + * no events arrived during the time interval.</li> + * <li>The accumulator function will be run and the forwarded event + * delivered as a different task, (and therefore potentially on a different + * thread) from the one that delivered the event to this {@link PushStream}. + * </li> + * <li>If the window finishes by hitting the maximum number of events then + * the remaining time in the window will be applied as back-pressure to the + * previous stage, attempting to slow the producer to the expected windowing + * threshold.</li> + * </ul> + * + * @param timeSupplier + * @param maxEvents + * @param executor + * @param f + * @return Builder style (can be a new or the same object) + */ + <R> PushStream<R> window(Supplier<Duration> timeSupplier, + IntSupplier maxEvents, Executor executor, + BiFunction<Long,Collection<T>,R> f); + + /** + * Execute the action for each event received until the channel is closed. + * This is a terminating method, the returned promise is resolved when the + * channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param action The action to perform + * @return A promise that is resolved when the channel closes. + */ + Promise<Void> forEach(Consumer< ? super T> action); + + /** + * Collect the payloads in an Object array after the channel is closed. This + * is a terminating method, the returned promise is resolved when the + * channel is closed. + * <p> + * This is a <strong>terminal operation</strong> + * + * @return A promise that is resolved with all the payloads received over + * the channel + */ + Promise<Object[]> toArray(); + + /** + * Collect the payloads in an Object array after the channel is closed. This + * is a terminating method, the returned promise is resolved when the + * channel is closed. The type of the array is handled by the caller using a + * generator function that gets the length of the desired array. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param generator + * @return A promise that is resolved with all the payloads received over + * the channel + */ + <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator); + + /** + * Standard reduce, see Stream. The returned promise will be resolved when + * the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param identity The identity/begin value + * @param accumulator The accumulator + * @return A + */ + Promise<T> reduce(T identity, BinaryOperator<T> accumulator); + + /** + * Standard reduce without identity, so the return is an Optional. The + * returned promise will be resolved when the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param accumulator The accumulator + * @return an Optional + */ + Promise<Optional<T>> reduce(BinaryOperator<T> accumulator); + + /** + * Standard reduce with identity, accumulator and combiner. The returned + * promise will be resolved when the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param identity + * @param accumulator + * @param combiner combines to U's into one U (e.g. how combine two lists) + * @return The promise + */ + <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator, + BinaryOperator<U> combiner); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param collector + * @return A Promise representing the collected results + */ + <R, A> Promise<R> collect(Collector< ? super T,A,R> collector); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param comparator + * @return A Promise representing the minimum value, or null if no values + * are seen before the end of the stream + */ + Promise<Optional<T>> min(Comparator< ? super T> comparator); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param comparator + * @return A Promise representing the maximum value, or null if no values + * are seen before the end of the stream + */ + Promise<Optional<T>> max(Comparator< ? super T> comparator); + + /** + * See Stream. Will resolve onces the channel closes. + * <p> + * This is a <strong>terminal operation</strong> + * + * @return A Promise representing the number of values in the stream + */ + Promise<Long> count(); + + /** + * Close the channel and resolve the promise with true when the predicate + * matches a payload. If the channel is closed before the predicate matches, + * the promise is resolved with false. + * <p> + * This is a <strong>short circuiting terminal operation</strong> + * + * @param predicate + * @return A Promise that will resolve when an event matches the predicate, + * or the end of the stream is reached + */ + Promise<Boolean> anyMatch(Predicate< ? super T> predicate); + + /** + * Closes the channel and resolve the promise with false when the predicate + * does not matches a pay load.If the channel is closed before, the promise + * is resolved with true. + * <p> + * This is a <strong>short circuiting terminal operation</strong> + * + * @param predicate + * @return A Promise that will resolve when an event fails to match the + * predicate, or the end of the stream is reached + */ + Promise<Boolean> allMatch(Predicate< ? super T> predicate); + + /** + * Closes the channel and resolve the promise with false when the predicate + * matches any pay load. If the channel is closed before, the promise is + * resolved with true. + * <p> + * This is a <strong>short circuiting terminal operation</strong> + * + * @param predicate + * @return A Promise that will resolve when an event matches the predicate, + * or the end of the stream is reached + */ + Promise<Boolean> noneMatch(Predicate< ? super T> predicate); + + /** + * Close the channel and resolve the promise with the first element. If the + * channel is closed before, the Optional will have no value. + * + * @return a promise + */ + Promise<Optional<T>> findFirst(); + + /** + * Close the channel and resolve the promise with the first element. If the + * channel is closed before, the Optional will have no value. + * <p> + * This is a <strong>terminal operation</strong> + * + * @return a promise + */ + Promise<Optional<T>> findAny(); + + /** + * Pass on each event to another consumer until the stream is closed. + * <p> + * This is a <strong>terminal operation</strong> + * + * @param action + * @return a promise + */ + Promise<Long> forEachEvent(PushEventConsumer< ? super T> action); + +}
Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilder.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,67 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +/** + * A Builder for a PushStream. This Builder extends the support of a standard + * BufferBuilder by allowing the PushStream to be unbuffered. + * + * + * @param <T> The type of objects in the {@link PushEvent} + * @param <U> The type of the Queue used in the user specified buffer + */ +public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends BufferBuilder<PushStream<T>,T,U> { + + /** + * Tells this {@link PushStreamBuilder} to create an unbuffered stream which + * delivers events directly to its consumer using the incoming delivery + * thread. + * + * @return the builder + */ + PushStreamBuilder<T,U> unbuffered(); + + /* + * Overridden methods to allow the covariant return of a PushStreamBuilder + */ + + @Override + PushStreamBuilder<T,U> withBuffer(U queue); + + @Override + PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy); + + @Override + PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption queuePolicyOption); + + @Override + PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy); + + @Override + PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time); + + @Override + PushStreamBuilder<T,U> withParallelism(int parallelism); + + @Override + PushStreamBuilder<T,U> withExecutor(Executor executor); +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamBuilderImpl.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; + +class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>> + extends AbstractBufferBuilder<PushStream<T>,T,U> + implements PushStreamBuilder<T,U> { + + private final PushStreamProvider psp; + private final PushEventSource<T> eventSource; + private final Executor previousExecutor; + + private boolean unbuffered; + + PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor, + PushEventSource<T> eventSource) { + this.psp = psp; + this.previousExecutor = defaultExecutor; + this.eventSource = eventSource; + this.worker = defaultExecutor; + } + + @Override + public PushStreamBuilder<T,U> withBuffer(U queue) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withBuffer(queue); + } + + @Override + public PushStreamBuilder<T,U> withQueuePolicy( + QueuePolicy<T,U> queuePolicy) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy); + } + + @Override + public PushStreamBuilder<T,U> withQueuePolicy( + QueuePolicyOption queuePolicyOption) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withQueuePolicy( + queuePolicyOption); + } + + @Override + public PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicy<T,U> pushbackPolicy) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withPushbackPolicy( + pushbackPolicy); + } + + @Override + public PushStreamBuilder<T,U> withPushbackPolicy( + PushbackPolicyOption pushbackPolicyOption, long time) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withPushbackPolicy( + pushbackPolicyOption, time); + } + + @Override + public PushStreamBuilder<T,U> withParallelism(int parallelism) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withParallelism(parallelism); + } + + @Override + public PushStreamBuilder<T,U> withExecutor(Executor executor) { + unbuffered = false; + return (PushStreamBuilder<T,U>) super.withExecutor(executor); + } + + @Override + public PushStreamBuilder<T,U> unbuffered() { + unbuffered = true; + return this; + } + + @Override + public PushStream<T> create() { + if (unbuffered) { + return psp.createUnbufferedStream(eventSource, previousExecutor); + } else { + return psp.createStream(eventSource, concurrency, worker, buffer, + bufferingPolicy, backPressure); + } + } +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushStreamProvider.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,586 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.aries.pushstream.AbstractPushStreamImpl.State.CLOSED; +import static org.osgi.util.pushstream.PushEvent.data; +import static org.osgi.util.pushstream.PushEvent.error; +import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR; +import static org.osgi.util.pushstream.QueuePolicyOption.FAIL; + +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.aries.pushstream.BufferedPushStreamImpl; +import org.apache.aries.pushstream.SimplePushEventSourceImpl; +import org.apache.aries.pushstream.UnbufferedPushStreamImpl; + +/** + * A factory for {@link PushStream} instances, and utility methods for handling + * {@link PushEventSource}s and {@link PushEventConsumer}s + */ +public final class PushStreamProvider { + + private final Lock lock = new ReentrantLock(true); + + private int schedulerReferences; + + private ScheduledExecutorService scheduler; + + private ScheduledExecutorService acquireScheduler() { + try { + lock.lockInterruptibly(); + try { + schedulerReferences += 1; + + if (schedulerReferences == 1) { + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + return scheduler; + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + throw new IllegalStateException("Unable to acquire the Scheduler", + e); + } + } + + private void releaseScheduler() { + try { + lock.lockInterruptibly(); + try { + schedulerReferences -= 1; + + if (schedulerReferences == 0) { + scheduler.shutdown(); + scheduler = null; + } + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * Create a stream with the default configured buffer, executor size, queue, + * queue policy and pushback policy. This is equivalent to calling + * + * <code> + * buildStream(source).create(); + * </code> + * + * <p> + * This stream will be buffered from the event producer, and will honour + * back pressure even if the source does not. + * + * <p> + * Buffered streams are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm downstream processors. Buffering will not, + * however, protect downstream components from a source which produces + * events faster (on average) than they can be consumed. + * + * <p> + * Event delivery will not begin until a terminal operation is reached on + * the chain of AsyncStreams. Once a terminal operation is reached the + * stream will be connected to the event source. + * + * @param eventSource + * @return A {@link PushStream} with a default initial buffer + */ + public <T> PushStream<T> createStream(PushEventSource<T> eventSource) { + return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32), + FAIL.getPolicy(), LINEAR.getPolicy(1000)); + } + + /** + * Builds a push stream with custom configuration. + * + * <p> + * + * The resulting {@link PushStream} may be buffered or unbuffered depending + * on how it is configured. + * + * @param eventSource The source of the events + * + * @return A {@link PushStreamBuilder} for the stream + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream( + PushEventSource<T> eventSource) { + return new PushStreamBuilderImpl<T,U>(this, null, eventSource); + } + + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream( + PushEventSource<T> eventSource, int parallelism, Executor executor, + U queue, QueuePolicy<T,U> queuePolicy, + PushbackPolicy<T,U> pushbackPolicy) { + + if (eventSource == null) { + throw new NullPointerException("There is no source of events"); + } + + if (parallelism < 0) { + throw new IllegalArgumentException( + "The supplied parallelism cannot be less than zero. It was " + + parallelism); + } else if (parallelism == 0) { + parallelism = 1; + } + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(parallelism); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + if (queue == null) { + queue = (U) new ArrayBlockingQueue(32); + } + + if (queuePolicy == null) { + queuePolicy = FAIL.getPolicy(); + } + + if (pushbackPolicy == null) { + pushbackPolicy = LINEAR.getPolicy(1000); + } + + @SuppressWarnings("resource") + PushStream<T> stream = new BufferedPushStreamImpl<>(this, + acquireScheduler(), queue, parallelism, toUse, queuePolicy, + pushbackPolicy, aec -> { + try { + return eventSource.open(aec); + } catch (Exception e) { + throw new RuntimeException( + "Unable to connect to event source", e); + } + }); + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + return stream; + } + + <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource, + Executor executor) { + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + @SuppressWarnings("resource") + PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse, + acquireScheduler(), aec -> { + try { + return eventSource.open(aec); + } catch (Exception e) { + throw new RuntimeException( + "Unable to connect to event source", e); + } + }); + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + + return stream; + } + + /** + * Convert an {@link PushStream} into an {@link PushEventSource}. The first + * call to {@link PushEventSource#open(PushEventConsumer)} will begin event + * processing. + * + * The {@link PushEventSource} will remain active until the backing stream + * is closed, and permits multiple consumers to + * {@link PushEventSource#open(PushEventConsumer)} it. + * + * This is equivalent to: <code> + * buildEventSourceFromStream(stream).create(); + * </code> + * + * @param stream + * @return a {@link PushEventSource} backed by the {@link PushStream} + */ + public <T> PushEventSource<T> createEventSourceFromStream( + PushStream<T> stream) { + return buildEventSourceFromStream(stream).create(); + } + + /** + * Convert an {@link PushStream} into an {@link PushEventSource}. The first + * call to {@link PushEventSource#open(PushEventConsumer)} will begin event + * processing. + * + * The {@link PushEventSource} will remain active until the backing stream + * is closed, and permits multiple consumers to + * {@link PushEventSource#open(PushEventConsumer)} it. + * + * @param stream + * + * @return a {@link PushEventSource} backed by the {@link PushStream} + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream( + PushStream<T> stream) { + return new AbstractBufferBuilder<PushEventSource<T>,T,U>() { + @Override + public PushEventSource<T> create() { + SimplePushEventSource<T> spes = createSimplePushEventSource( + concurrency, worker, buffer, bufferingPolicy, () -> { + try { + stream.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }); + spes.connectPromise() + .then(p -> stream.forEach(t -> spes.publish(t)) + .onResolve(() -> spes.close())); + return spes; + } + }; + } + + + /** + * Create a {@link SimplePushEventSource} with the supplied type and default + * buffering behaviours. The SimplePushEventSource will respond to back + * pressure requests from the consumers connected to it. + * + * This is equivalent to: <code> + * buildSimpleEventSource(type).create(); + * </code> + * + * @param type + * @return a {@link SimplePushEventSource} + */ + public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) { + return createSimplePushEventSource(1, null, + new ArrayBlockingQueue<>(32), + FAIL.getPolicy(), () -> { /* Nothing else to do */ }); + } + + /** + * + * Build a {@link SimplePushEventSource} with the supplied type and custom + * buffering behaviours. The SimplePushEventSource will respond to back + * pressure requests from the consumers connected to it. + * + * @param type + * + * @return a {@link SimplePushEventSource} + */ + + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource( + Class<T> type) { + return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() { + @Override + public SimplePushEventSource<T> create() { + return createSimplePushEventSource(concurrency, worker, buffer, + bufferingPolicy, () -> { /* Nothing else to do */ }); + } + }; + } + + @SuppressWarnings({ + "unchecked", "rawtypes" + }) + <T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource( + int parallelism, Executor executor, U queue, + QueuePolicy<T,U> queuePolicy, Runnable onClose) { + + if (parallelism < 0) { + throw new IllegalArgumentException( + "The supplied parallelism cannot be less than zero. It was " + + parallelism); + } else if (parallelism == 0) { + parallelism = 1; + } + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + if (queue == null) { + queue = (U) new ArrayBlockingQueue(32); + } + + if (queuePolicy == null) { + queuePolicy = FAIL.getPolicy(); + } + + SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>( + toUse, acquireScheduler(), queuePolicy, queue, parallelism, + () -> { + try { + onClose.run(); + } catch (Exception e) { + // TODO log this? + } + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }); + return spes; + } + + /** + * Create a buffered {@link PushEventConsumer} with the default configured + * buffer, executor size, queue, queue policy and pushback policy. This is + * equivalent to calling + * + * <code> + * buildBufferedConsumer(delegate).create(); + * </code> + * + * <p> + * The returned consumer will be buffered from the event source, and will + * honour back pressure requests from its delegate even if the event source + * does not. + * + * <p> + * Buffered consumers are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm the consumer. Buffering will not, however, + * protect downstream components from a source which produces events faster + * than they can be consumed. + * + * @param delegate + * @return a {@link PushEventConsumer} with a buffer directly before it + */ + public <T> PushEventConsumer<T> createBufferedConsumer( + PushEventConsumer<T> delegate) { + return buildBufferedConsumer(delegate).create(); + } + + /** + * Build a buffered {@link PushEventConsumer} with custom configuration. + * <p> + * The returned consumer will be buffered from the event source, and will + * honour back pressure requests from its delegate even if the event source + * does not. + * <p> + * Buffered consumers are useful for "bursty" event sources which produce a + * number of events close together, then none for some time. These bursts + * can sometimes overwhelm the consumer. Buffering will not, however, + * protect downstream components from a source which produces events faster + * than they can be consumed. + * <p> + * Buffers are also useful as "circuit breakers". If a + * {@link QueuePolicyOption#FAIL} is used then a full buffer will request + * that the stream close, preventing an event storm from reaching the + * client. + * <p> + * Note that this buffered consumer will close when it receives a terminal + * event, or if the delegate returns negative backpressure. No further + * events will be propagated after this time. + * + * @param delegate + * @return a {@link PushEventConsumer} with a buffer directly before it + */ + public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer( + PushEventConsumer<T> delegate) { + return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() { + @Override + public PushEventConsumer<T> create() { + PushEventPipe<T> pipe = new PushEventPipe<>(); + + createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure) + .forEachEvent(delegate); + + return pipe; + } + }; + } + + static final class PushEventPipe<T> + implements PushEventConsumer<T>, PushEventSource<T> { + + volatile PushEventConsumer< ? super T> delegate; + + @Override + public AutoCloseable open(PushEventConsumer< ? super T> pec) + throws Exception { + return () -> { /* Nothing else to do */ }; + } + + @Override + public long accept(PushEvent< ? extends T> event) throws Exception { + return delegate.accept(event); + } + + } + + /** + * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The + * data from the stream will be pushed into the PushStream synchronously as + * it is opened. This may make terminal operations blocking unless a buffer + * has been added to the {@link PushStream}. Care should be taken with + * infinite {@link Stream}s to avoid blocking indefinitely. + * + * @param items The items to push into the PushStream + * @return A PushStream containing the items from the Java Stream + */ + public <T> PushStream<T> streamOf(Stream<T> items) { + PushEventSource<T> pes = aec -> { + AtomicBoolean closed = new AtomicBoolean(false); + + items.mapToLong(i -> { + try { + long returnValue = closed.get() ? -1 : aec.accept(data(i)); + if (returnValue < 0) { + aec.accept(PushEvent.<T>close()); + } + return returnValue; + } catch (Exception e) { + try { + aec.accept(PushEvent.<T>error(e)); + } catch (Exception e2) {/* No further events needed */} + return -1; + } + }).filter(i -> i < 0).findFirst().orElseGet(() -> { + try { + return aec.accept(PushEvent.<T>close()); + } catch (Exception e) { + return -1; + } + }); + + return () -> closed.set(true); + }; + + return this.<T> createUnbufferedStream(pes, null); + } + + /** + * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The + * data from the stream will be pushed into the PushStream asynchronously + * using the supplied Executor. + * + * @param executor The worker to use to push items from the Stream into the + * PushStream + * @param items The items to push into the PushStream + * @return A PushStream containing the items from the Java Stream + */ + public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) { + + boolean closeExecutorOnClose; + Executor toUse; + if (executor == null) { + toUse = Executors.newFixedThreadPool(2); + closeExecutorOnClose = true; + } else { + toUse = executor; + closeExecutorOnClose = false; + } + + @SuppressWarnings("resource") + PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>( + this, toUse, acquireScheduler(), aec -> { + return () -> { /* No action to take */ }; + }) { + + @Override + protected boolean begin() { + if (super.begin()) { + Iterator<T> it = items.iterator(); + + toUse.execute(() -> pushData(it)); + + return true; + } + return false; + } + + private void pushData(Iterator<T> it) { + while (it.hasNext()) { + try { + long returnValue = closed.get() == CLOSED ? -1 + : handleEvent(data(it.next())); + if (returnValue != 0) { + if (returnValue < 0) { + close(); + return; + } else { + scheduler.schedule( + () -> toUse.execute(() -> pushData(it)), + returnValue, MILLISECONDS); + return; + } + } + } catch (Exception e) { + close(error(e)); + } + } + close(); + } + }; + + stream = stream.onClose(() -> { + if (closeExecutorOnClose) { + ((ExecutorService) toUse).shutdown(); + } + releaseScheduler(); + }).map(Function.identity()); + + return stream; + } +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicy.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,48 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import org.osgi.annotation.versioning.ConsumerType; + +/** + * A {@link PushbackPolicy} is used to calculate how much back pressure to apply + * based on the current buffer. The {@link PushbackPolicy} will be called after + * an event has been queued, and the returned value will be used as back + * pressure. + * + * @see PushbackPolicyOption + * + * + * @param <T> The type of the data + * @param <U> The type of the queue + */ +@ConsumerType +@FunctionalInterface +public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> { + + /** + * Given the current state of the queue, determine the level of back + * pressure that should be applied + * + * @param queue + * @return a back pressure value in nanoseconds + * @throws Exception + */ + public long pushback(U queue) throws Exception; + +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/PushbackPolicyOption.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,98 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * {@link PushbackPolicyOption} provides a standard set of simple + * {@link PushbackPolicy} implementations. + * + * @see PushbackPolicy + */ +public enum PushbackPolicyOption { + + /** + * Returns a fixed amount of back pressure, independent of how full the + * buffer is + */ + FIXED { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + return q -> value; + } + }, + /** + * Returns zero back pressure until the buffer is full, then it returns a + * fixed value + */ + ON_FULL_FIXED { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + return q -> q.remainingCapacity() == 0 ? value : 0; + } + }, + /** + * Returns zero back pressure until the buffer is full, then it returns an + * exponentially increasing amount, starting with the supplied value and + * doubling it each time. Once the buffer is no longer full the back + * pressure returns to zero. + */ + ON_FULL_EXPONENTIAL { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + AtomicInteger backoffCount = new AtomicInteger(0); + return q -> { + if (q.remainingCapacity() == 0) { + return value << backoffCount.getAndIncrement(); + } + backoffCount.set(0); + return 0; + }; + + } + }, + /** + * Returns zero back pressure when the buffer is empty, then it returns a + * linearly increasing amount of back pressure based on how full the buffer + * is. The maximum value will be returned when the buffer is full. + */ + LINEAR { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) { + return q -> { + long remainingCapacity = q.remainingCapacity(); + long used = q.size(); + return (value * used) / (used + remainingCapacity); + }; + } + }; + + /** + * Create a {@link PushbackPolicy} instance configured with a base back + * pressure time in nanoseconds + * + * The actual backpressure returned will vary based on the selected + * implementation, the base value, and the state of the buffer. + * + * @param value + * @return A {@link PushbackPolicy} to use + */ + public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value); + +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicy.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,52 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; + +import org.osgi.annotation.versioning.ConsumerType; +import org.osgi.util.pushstream.PushEvent.EventType; + +/** + * A {@link QueuePolicy} is used to control how events should be queued in the + * current buffer. The {@link QueuePolicy} will be called when an event has + * arrived. + * + * @see QueuePolicyOption + * + * + * @param <T> The type of the data + * @param <U> The type of the queue + */ + +@ConsumerType +@FunctionalInterface +public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> { + + /** + * Enqueue the event and return the remaining capacity available for events + * + * @param queue + * @param event + * @throws Exception If an error ocurred adding the event to the queue. This + * exception will cause the connection between the + * {@link PushEventSource} and the {@link PushEventConsumer} to be + * closed with an {@link EventType#ERROR} + */ + public void doOffer(U queue, PushEvent<? extends T> event) throws Exception; + +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/QueuePolicyOption.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,76 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import java.util.concurrent.BlockingQueue; + +/** + * {@link QueuePolicyOption} provides a standard set of simple + * {@link QueuePolicy} implementations. + * + * @see QueuePolicy + */ +public enum QueuePolicyOption { + /** + * Attempt to add the supplied event to the queue. If the queue is unable to + * immediately accept the value then discard the value at the head of the + * queue and try again. Repeat this process until the event is enqueued. + */ + DISCARD_OLDEST { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() { + return (queue, event) -> { + while (!queue.offer(event)) { + queue.poll(); + } + }; + } + }, + /** + * Attempt to add the supplied event to the queue, blocking until the + * enqueue is successful. + */ + BLOCK { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() { + return (queue, event) -> { + try { + queue.put(event); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + } + }, + /** + * Attempt to add the supplied event to the queue, throwing an exception if + * the queue is full. + */ + FAIL { + @Override + public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() { + return (queue, event) -> queue.add(event); + } + }; + + /** + * @return a {@link QueuePolicy} implementation + */ + public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy(); + +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/SimplePushEventSource.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,104 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.util.pushstream; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.util.promise.Promise; + +/** + * A {@link SimplePushEventSource} is a helper that makes it simpler to write a + * {@link PushEventSource}. Users do not need to manage multiple registrations + * to the stream, nor do they have to be concerned with back pressure. + * + * @param <T> The type of the events produced by this source + */ +@ProviderType +public interface SimplePushEventSource<T> + extends PushEventSource<T>, AutoCloseable { + /** + * Close this source. Calling this method indicates that there will never be + * any more events published by it. Calling this method sends a close event + * to all connected consumers. After calling this method any + * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)} + * this source will immediately receive a close event. + */ + @Override + void close(); + + /** + * Asynchronously publish an event to this stream and all connected + * {@link PushEventConsumer} instances. When this method returns there is no + * guarantee that all consumers have been notified. Events published by a + * single thread will maintain their relative ordering, however they may be + * interleaved with events from other threads. + * + * @param t + * @throws IllegalStateException if the source is closed + */ + void publish(T t); + + /** + * Close this source for now, but potentially reopen it later. Calling this + * method asynchronously sends a close event to all connected consumers. + * After calling this method any {@link PushEventConsumer} that wishes may + * {@link #open(PushEventConsumer)} this source, and will receive subsequent + * events. + */ + void endOfStream(); + + /** + * Close this source for now, but potentially reopen it later. Calling this + * method asynchronously sends an error event to all connected consumers. + * After calling this method any {@link PushEventConsumer} that wishes may + * {@link #open(PushEventConsumer)} this source, and will receive subsequent + * events. + * + * @param e the error + */ + void error(Exception e); + + /** + * Determine whether there are any {@link PushEventConsumer}s for this + * {@link PushEventSource}. This can be used to skip expensive event + * creation logic when there are no listeners. + * + * @return true if any consumers are currently connected + */ + boolean isConnected(); + + /** + * This method can be used to delay event generation until an event source + * has connected. The returned promise will resolve as soon as one or more + * {@link PushEventConsumer} instances have opened the + * SimplePushEventSource. + * <p> + * The returned promise may already be resolved if this + * {@link SimplePushEventSource} already has connected consumers. If the + * {@link SimplePushEventSource} is closed before the returned Promise + * resolves then it will be failed with an {@link IllegalStateException}. + * <p> + * Note that the connected consumers are able to asynchronously close their + * connections to this {@link SimplePushEventSource}, and therefore it is + * possible that once the promise resolves this + * {@link SimplePushEventSource} may no longer be connected to any + * consumers. + * + * @return A promise representing the connection state of this EventSource + */ + Promise<Void> connectPromise(); + +} Added: aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java URL: http://svn.apache.org/viewvc/aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java?rev=1766040&view=auto ============================================================================== --- aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java (added) +++ aries/trunk/pushstream/pushstream/src/main/java/org/osgi/util/pushstream/package-info.java Fri Oct 21 15:10:51 2016 @@ -0,0 +1,39 @@ +/* + * Copyright (c) OSGi Alliance (2015). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Push Stream Package Version 1.0. + * + * <p> + * Bundles wishing to use this package must list the package in the + * Import-Package header of the bundle's manifest. + * + * <p> + * Example import for consumers using the API in this package: + * <p> + * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"} + * <p> + * Example import for providers implementing the API in this package: + * <p> + * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"} + * + * @author $Id: 6a28fa0b5c2036486a22a7ca1254729d7848ca43 $ + */ + +@Version("1.0") +package org.osgi.util.pushstream; + +import org.osgi.annotation.versioning.Version;
