Author: csierra
Date: Fri Nov 17 16:00:41 2017
New Revision: 1815581

URL: http://svn.apache.org/viewvc?rev=1815581&view=rev
Log:
[Component-DSL] Refactor and source formatting

Added:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java
      - copied, changed from r1815580, 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java
      - copied, changed from r1815580, 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
Removed:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Event.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/HighestsPerTransformer.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/UtilTest.java
Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/CachingServiceReference.java
 Fri Nov 17 16:00:41 2017
@@ -31,9 +31,6 @@ import java.util.stream.Collectors;
 public class CachingServiceReference<T>
     implements Comparable<CachingServiceReference<T>> {
 
-    private final ConcurrentHashMap<String, Object> _properties;
-    private final ServiceReference<T> _serviceReference;
-
     public CachingServiceReference(ServiceReference<T> serviceReference) {
         _properties = new ConcurrentHashMap<>();
         _serviceReference = serviceReference;
@@ -140,12 +137,6 @@ public class CachingServiceReference<T>
      */
     public ServiceReference<T> getServiceReference() {
         return _serviceReference;
-    }    @Override
-    public String toString() {
-        return "CachingServiceReference{" +
-            "cachedProperties=" + _properties + ", " +
-            "serviceReference=" + _serviceReference +
-            '}';
     }
 
     @Override
@@ -162,7 +153,6 @@ public class CachingServiceReference<T>
 
         return _serviceReference.equals(that._serviceReference);
     }
-
     /**
      * Checks if any of the cached properties has a different value in the
      * underlying {@link ServiceReference}. Only properties that have been
@@ -180,6 +170,14 @@ public class CachingServiceReference<T>
         );
     }
 
+    @Override
+    public String toString() {
+        return "CachingServiceReference{" +
+            "cachedProperties=" + _properties + ", " +
+            "serviceReference=" + _serviceReference +
+            '}';
+    }
+
     /**
      * Checks if the property is dirty in this instance without caching the
      * value. Trying to do the same using getProperty would cache the property
@@ -194,6 +192,9 @@ public class CachingServiceReference<T>
             !value.equals(_serviceReference.getProperty(key));
     }
 
+    private final ConcurrentHashMap<String, Object> _properties;
+    private final ServiceReference<T> _serviceReference;
+
     private static class NULL {
         private static NULL INSTANCE = new NULL();
 
@@ -208,6 +209,4 @@ public class CachingServiceReference<T>
         }
     }
 
-
-
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGi.java
 Fri Nov 17 16:00:41 2017
@@ -59,42 +59,12 @@ import java.util.function.Supplier;
 public interface OSGi<T> extends OSGiRunnable<T> {
        Runnable NOOP = () -> {};
 
-       <S> OSGi<S> choose(
-               Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
-               Function<OSGi<T>, OSGi<S>> otherwise);
-
-       <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
-
-       <K, S> OSGi<S> splitBy(
-               Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
-
-       OSGi<T> recover(BiFunction<T, Exception, T> onError);
-
-       OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
-
-       OSGi<T> effects(
-               Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
-
-       <S> OSGi<S> map(Function<? super T, ? extends S> function);
-
-       <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
-
-       <S> OSGi<S> then(OSGi<S> next);
-
-       OSGi<Void> foreach(Consumer<? super T> onAdded);
-
-       OSGi<Void> foreach(
-               Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
-
-       <S> OSGi<S> transform(
-               Function<Function<S, Runnable>, Function<T, Runnable>> fun);
-
-       static OSGi<Void> ignore(OSGi<?> program) {
-               return new IgnoreImpl(program);
+       @SafeVarargs
+       static <T> OSGi<T> all(OSGi<T> ... programs) {
+               return new AllOSGi<>(programs);
        }
 
        static OSGi<BundleContext> bundleContext() {
-
                return new BundleContextOSGiImpl();
        }
 
@@ -108,6 +78,26 @@ public interface OSGi<T> extends OSGiRun
                return new ChangeContextOSGiImpl<>(program, bundleContext);
        }
 
+       static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> a, 
OSGi<B> b) {
+               return b.applyTo(a.applyTo(just(fun.curried())));
+       }
+
+       static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, OSGi<A> 
a, OSGi<B> b, OSGi<C> c) {
+               return c.applyTo(OSGi.combine((A aa, B bb) -> 
fun.curried().apply(aa).apply(bb), a, b));
+       }
+
+       static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> fun, 
OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
+               return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> 
fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
+       }
+
+       static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, E, F> 
fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
+               return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> 
fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
+       }
+
+       static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, D, E, 
F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> f) {
+               return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> 
fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, 
e));
+       }
+
        static OSGi<Dictionary<String, ?>> configuration(String pid) {
                return new ConfigurationOSGiImpl(pid);
        }
@@ -116,6 +106,14 @@ public interface OSGi<T> extends OSGiRun
                return new ConfigurationsOSGiImpl(factoryPid);
        }
 
+       static OSGi<Void> ignore(OSGi<?> program) {
+               return new IgnoreImpl(program);
+       }
+
+       static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
+               return program.flatMap(x -> x);
+       }
+
        static <S> OSGi<S> just(S s) {
                return new JustOSGiImpl<>(s);
        }
@@ -128,10 +126,6 @@ public interface OSGi<T> extends OSGiRun
                return new JustOSGiImpl<>(() -> 
Collections.singletonList(s.get()));
        }
 
-       static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
-               return program.flatMap(x -> x);
-       }
-
        static <S> OSGi<S> nothing() {
                return new NothingOSGiImpl<>();
        }
@@ -140,6 +134,28 @@ public interface OSGi<T> extends OSGiRun
                return new OnCloseOSGiImpl(action);
        }
 
+       static <T> OSGi<T> once(OSGi<T> program) {
+               return program.transform(op -> {
+                       AtomicInteger count = new AtomicInteger();
+
+                       AtomicReference<Runnable> terminator = new 
AtomicReference<>();
+
+                       return t -> {
+                               if (count.getAndIncrement() == 0) {
+                                       terminator.set(op.apply(t));
+                               }
+
+                               return () -> {
+                                       if (count.decrementAndGet() == 0) {
+                                               Runnable runnable = 
terminator.getAndSet(NOOP);
+
+                                               runnable.run();
+                                       }
+                               };
+                       };
+               });
+       }
+
        static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
                return prototypes(null, filterString);
        }
@@ -180,55 +196,6 @@ public interface OSGi<T> extends OSGiRun
                return new ServiceRegistrationOSGiImpl(classes, service, 
properties);
        }
 
-       static <T> OSGi<T> services(Class<T> clazz) {
-               return services(clazz, null);
-       }
-
-       static <T> OSGi<Object> services(String filterString) {
-               return services(null, filterString);
-       }
-
-       static <T> OSGi<T> services(Class<T> clazz, String filterString) {
-               return
-                       bundleContext().flatMap(
-                       bundleContext ->
-
-                       serviceReferences(clazz, filterString).map(
-                               CachingServiceReference::getServiceReference
-                       ).flatMap(
-                               sr -> {
-                                       T service = 
bundleContext.getService(sr);
-
-                                       return
-                                               onClose(() -> 
bundleContext.ungetService(sr)).then(
-                                               just(service)
-                                       );
-                       }
-               ));
-       }
-
-       public static <T> OSGi<T> once(OSGi<T> program) {
-               return program.transform(op -> {
-                       AtomicInteger count = new AtomicInteger();
-
-                       AtomicReference<Runnable> terminator = new 
AtomicReference<>();
-
-                       return t -> {
-                               if (count.getAndIncrement() == 0) {
-                                       terminator.set(op.apply(t));
-                               }
-
-                               return () -> {
-                                       if (count.decrementAndGet() == 0) {
-                                               Runnable runnable = 
terminator.getAndSet(NOOP);
-
-                                               runnable.run();
-                                       }
-                               };
-                       };
-               });
-       }
-
        static <T> OSGi<CachingServiceReference<T>> serviceReferences(
                Class<T> clazz) {
 
@@ -267,35 +234,66 @@ public interface OSGi<T> extends OSGiRun
                return new ServiceReferenceOSGi<>(filterString, null, 
onModified);
        }
 
-       @SafeVarargs
-       static <T> OSGi<T> all(OSGi<T> ... programs) {
-               return new AllOSGi<>(programs);
+       static <T> OSGi<T> services(Class<T> clazz) {
+               return services(clazz, null);
        }
 
-       OSGi<T> filter(Predicate<T> predicate);
-
-       public default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-               return fun.flatMap(this::map);
+       static <T> OSGi<Object> services(String filterString) {
+               return services(null, filterString);
        }
 
-       public static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> 
a, OSGi<B> b) {
-               return b.applyTo(a.applyTo(just(fun.curried())));
-       }
+       static <T> OSGi<T> services(Class<T> clazz, String filterString) {
+               return
+                       bundleContext().flatMap(
+                       bundleContext ->
 
-       public static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, 
OSGi<A> a, OSGi<B> b, OSGi<C> c) {
-               return c.applyTo(OSGi.combine((A aa, B bb) -> 
fun.curried().apply(aa).apply(bb), a, b));
-       }
+                       serviceReferences(clazz, filterString).map(
+                               CachingServiceReference::getServiceReference
+                       ).flatMap(
+                               sr -> {
+                                       T service = 
bundleContext.getService(sr);
 
-       public static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> 
fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
-               return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> 
fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
+                                       return
+                                               onClose(() -> 
bundleContext.ungetService(sr)).then(
+                                               just(service)
+                                       );
+                       }
+               ));
        }
 
-       public static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, 
E, F> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
-               return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> 
fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
+       default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+               return fun.flatMap(this::map);
        }
 
-       public static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, 
D, E, F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> 
f) {
-               return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> 
fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, 
e));
-       }
+       <S> OSGi<S> choose(
+               Predicate<T> chooser, Function<OSGi<T>, OSGi<S>> then,
+               Function<OSGi<T>, OSGi<S>> otherwise);
+
+       <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... funs);
+
+       OSGi<T> effects(
+               Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
+
+       OSGi<T> filter(Predicate<T> predicate);
+
+       <S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
+
+       OSGi<Void> foreach(Consumer<? super T> onAdded);
+
+       OSGi<Void> foreach(
+               Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
+
+       <S> OSGi<S> map(Function<? super T, ? extends S> function);
+
+       OSGi<T> recover(BiFunction<T, Exception, T> onError);
+
+       OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
+
+       <K, S> OSGi<S> splitBy(
+               Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun);
+
+       <S> OSGi<S> then(OSGi<S> next);
+
+       <S> OSGi<S> transform(Transformer<T, S> fun);
 
 }

Copied: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java
 (from r1815580, 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java)
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java&r1=1815580&r2=1815581&rev=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/SentEvent.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Publisher.java
 Fri Nov 17 16:00:41 2017
@@ -17,12 +17,17 @@
 
 package org.apache.aries.osgi.functional;
 
+import java.util.function.Function;
+
 /**
  * @author Carlos Sierra Andrés
  */
-public interface SentEvent<T> {
+public interface Publisher<T> extends Function<T, Runnable> {
+
+    default Runnable apply(T t) {
+        return publish(t);
+    }
 
-    Event<T> getEvent();
+    Runnable publish(T t);
 
-    void terminate();
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Transformer.java
 Fri Nov 17 16:00:41 2017
@@ -6,5 +6,13 @@ import java.util.function.Function;
  * @author Carlos Sierra Andrés
  */
 public interface Transformer<T, R> extends
-    Function<Function<R, Runnable>, Function<T, Runnable>> {
+    Function<Publisher<R>, Publisher<T>> {
+
+    @Override
+    default Publisher<T> apply(Publisher<R> pipe) {
+        return transform(pipe);
+    }
+
+    Publisher<T> transform(Publisher<R> pipe);
+
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/Utils.java
 Fri Nov 17 16:00:41 2017
@@ -1,24 +1,19 @@
 package org.apache.aries.osgi.functional;
 
-import org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList;
+import org.apache.aries.osgi.functional.internal.AccumulateTransformer;
 import org.apache.aries.osgi.functional.internal.HighestRankingOSGi;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
 import java.util.function.Function;
 
-import static org.apache.aries.osgi.functional.OSGi.NOOP;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public interface Utils {
 
-    static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
-        return highest(program, Comparator.naturalOrder());
+    static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
+        return program.transform(new AccumulateTransformer<>());
     }
 
     static <T> OSGi<T> highest(
@@ -28,58 +23,14 @@ public interface Utils {
     }
 
     static <T> OSGi<T> highest(
-        OSGi<T> program, Comparator<? super T> comparator, Function<OSGi<T>, 
OSGi<T>> notHighest) {
+        OSGi<T> program, Comparator<? super T> comparator,
+        Function<OSGi<T>, OSGi<T>> notHighest) {
 
         return new HighestRankingOSGi<>(program, comparator, notHighest);
     }
 
-    static <T> OSGi<List<T>> accumulate(OSGi<T> program) {
-        return program.transform(op -> {
-            ConcurrentDoublyLinkedList<T> list =
-                new ConcurrentDoublyLinkedList<>();
-
-            AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
-
-            return t -> {
-                ConcurrentDoublyLinkedList.Node node = list.addLast(t);
-
-                publish(op, list, terminator);
-
-                return () -> {
-                    node.remove();
-
-                    publish(op, list, terminator);
-                };
-            };
-        });
-    }
-
-    static <T> void publish(Function<List<T>, Runnable> op, 
ConcurrentDoublyLinkedList<T> list, AtomicReference<Runnable> terminator) {
-        Runnable runnable = terminator.get();
-
-        runnable.run();
-
-        terminator.set(op.apply(new ArrayList<>(list)));
-    }
-
-    static <T> OSGi<T> republishIf(
-        BiPredicate<T, T> refresher, OSGi<T> program) {
-
-        return program.transform(op -> {
-            AtomicReference<T> old = new AtomicReference<>();
-            AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
-
-            return t -> {
-                if (refresher.test(old.get(), t)) {
-                    terminator.get().run();
-
-                    old.set(t);
-                    terminator.set(op.apply(t));
-                }
-
-                return () -> {};
-            };
-        });
+    static <T extends Comparable<? super T>> OSGi<T> highest(OSGi<T> program) {
+        return highest(program, Comparator.naturalOrder());
     }
 
 }

Added: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java?rev=1815581&view=auto
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
 (added)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/AccumulateTransformer.java
 Fri Nov 17 16:00:41 2017
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aries.osgi.functional.internal;
+
+import org.apache.aries.osgi.functional.Publisher;
+import org.apache.aries.osgi.functional.Transformer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
+/**
+ * @author Carlos Sierra Andrés
+ */
+public class AccumulateTransformer<T> implements Transformer<T, List<T>> {
+
+    @Override
+    public Publisher<T> transform(Publisher<List<T>> op) {
+        ConcurrentDoublyLinkedList<T> list =
+            new ConcurrentDoublyLinkedList<>();
+
+        AtomicReference<Runnable> terminator = new AtomicReference<>(NOOP);
+
+        return t -> {
+            ConcurrentDoublyLinkedList.Node node = list.addLast(t);
+
+            publish(op, list, terminator);
+
+            return () -> {
+                node.remove();
+
+                publish(op, list, terminator);
+            };
+        };
+    }
+
+    private static <T> void publish(
+        Function<List<T>, Runnable> op, ConcurrentDoublyLinkedList<T> list,
+        AtomicReference<Runnable> terminator) {
+
+        Runnable runnable = terminator.get();
+
+        runnable.run();
+
+        terminator.set(op.apply(new ArrayList<>(list)));
+    }
+
+}

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 Fri Nov 17 16:00:41 2017
@@ -17,23 +17,16 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleEvent;
 import org.osgi.util.tracker.BundleTracker;
 import org.osgi.util.tracker.BundleTrackerCustomizer;
 
-import java.util.function.Consumer;
-import java.util.function.Function;
-
 /**
  * @author Carlos Sierra Andrés
  */
 public class BundleOSGi extends OSGiImpl<Bundle> {
 
-       private final int _stateMask;
-
        public BundleOSGi(int stateMask) {
                super((bundleContext, op) -> {
                        BundleTracker<Runnable> bundleTracker =
@@ -68,54 +61,6 @@ public class BundleOSGi extends OSGiImpl
                                bundleTracker::open, bundleTracker::close);
                });
 
-               _stateMask = stateMask;
-       }
-
-       @Override
-       public <S> OSGiImpl<S> flatMap(
-               Function<? super Bundle, OSGi<? extends S>> fun) {
-
-               return new OSGiImpl<>((bundleContext, op) -> {
-                       BundleTracker<OSGiResult> bundleTracker =
-                               new BundleTracker<>(
-                                       bundleContext, _stateMask,
-                                       new 
BundleTrackerCustomizer<OSGiResult>() {
-
-                                               @Override
-                                               public OSGiResult addingBundle(
-                                                       Bundle bundle, 
BundleEvent bundleEvent) {
-
-                                                       OSGiImpl<S> program = 
(OSGiImpl<S>) fun.apply(
-                                                               bundle);
-
-                                                       OSGiResultImpl result = 
program._operation.run(
-                                                               bundleContext, 
op);
-
-                                                       result.start();
-
-                                                       return result;
-                                               }
-
-                                               @Override
-                                               public void modifiedBundle(
-                                                       Bundle bundle, 
BundleEvent bundleEvent,
-                                                       OSGiResult result) {
-
-                                               }
-
-                                               @Override
-                                               public void removedBundle(
-                                                       Bundle bundle, 
BundleEvent bundleEvent,
-                                                       OSGiResult result) {
-
-                                                       result.close();
-                                               }
-                                       });
-
-                       return new OSGiResultImpl(
-                               bundleTracker::open, bundleTracker::close);
-
-               });
        }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
 Fri Nov 17 16:00:41 2017
@@ -28,8 +28,7 @@ import java.util.concurrent.atomic.Atomi
 /**
  * @author Carlos Sierra Andrés
  */
-public class ConfigurationOSGiImpl
-       extends OSGiImpl<Dictionary<String, ?>> {
+public class ConfigurationOSGiImpl extends OSGiImpl<Dictionary<String, ?>> {
 
        public ConfigurationOSGiImpl(String pid) {
                super((bundleContext, op) -> {

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/HighestRankingOSGi.java
 Fri Nov 17 16:00:41 2017
@@ -40,10 +40,10 @@ public class HighestRankingOSGi<T> exten
                 comparing.reversed());
             AtomicReference<Tuple<T>> sent = new AtomicReference<>();
 
-            Function<T, Runnable> notHighestPipe = ProbeImpl.getProbePipe(
-                notHighest, bundleContext, __ -> () -> {});
+            Pad<T, T> notHighestPad = new Pad<>(
+                bundleContext, notHighest, __ -> NOOP);
 
-            return ((OSGiImpl<T>)previous)._operation.run(
+            OSGiResultImpl result = ((OSGiImpl<T>) previous)._operation.run(
                 bundleContext,
                 t -> {
                     Tuple<T> tuple = new Tuple<>(t);
@@ -57,15 +57,14 @@ public class HighestRankingOSGi<T> exten
                             if (old != null) {
                                 old._runnable.run();
 
-                                old._runnable = notHighestPipe.apply(old._t);
+                                old._runnable = notHighestPad.publish(old._t);
                             }
 
                             tuple._runnable = highestPipe.apply(t);
 
                             sent.set(tuple);
-                        }
-                        else {
-                            tuple._runnable = notHighestPipe.apply(t);
+                        } else {
+                            tuple._runnable = notHighestPad.publish(t);
                         }
                     }
 
@@ -91,14 +90,19 @@ public class HighestRankingOSGi<T> exten
                         }
                     };
                 });
+
+            return new OSGiResultImpl(
+                result::start,
+                () -> {
+                    result.close();
+
+                    notHighestPad.close();
+                });
         });
     }
 
     private static class Tuple<T> {
 
-        T _t;
-        Runnable _runnable;
-
         Tuple(T t) {
             _t = t;
         }
@@ -106,6 +110,8 @@ public class HighestRankingOSGi<T> exten
         public T getT() {
             return _t;
         }
+        T _t;
+        Runnable _runnable;
 
     }
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/IgnoreImpl.java
 Fri Nov 17 16:00:41 2017
@@ -27,8 +27,7 @@ public class IgnoreImpl extends OSGiImpl
     public IgnoreImpl(OSGi<?> program) {
 
         super((bundleContext, op) ->
-            ((OSGiImpl<?>) program)._operation.run(
-                bundleContext, t -> () -> {}));
+            ((OSGiImpl<?>) program)._operation.run(bundleContext, t -> NOOP));
     }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Fri Nov 17 16:00:41 2017
@@ -19,7 +19,8 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.Utils;
+import org.apache.aries.osgi.functional.Publisher;
+import org.apache.aries.osgi.functional.Transformer;
 import 
org.apache.aries.osgi.functional.internal.ConcurrentDoublyLinkedList.Node;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Filter;
@@ -47,27 +48,76 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
-       public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
-               return new FlatMapImpl<>(this, fun);
-       }
+       public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
+               return new OSGiImpl<>(
+                       (bundleContext, op) -> {
+                               AtomicReference<OSGiResult> myCloseReference =
+                                       new AtomicReference<>();
 
-       @Override
-       public OSGi<Void> foreach(Consumer<? super T> onAdded) {
-               return foreach(onAdded, ign -> {});
-       }
+                               AtomicReference<OSGiResult> otherCloseReference 
=
+                                       new AtomicReference<>();
 
-       @Override
-       public OSGi<Void> foreach(
-               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+                               ConcurrentDoublyLinkedList<T> identities =
+                                       new ConcurrentDoublyLinkedList<>();
 
-               return OSGi.ignore(effects(onAdded, onRemoved));
-       }
+                               ConcurrentDoublyLinkedList<Function<T, S>> funs 
=
+                                       new ConcurrentDoublyLinkedList<>();
 
-       @Override
-       public <S> OSGi<S> transform(
-               Function<Function<S, Runnable>, Function<T, Runnable>> fun) {
+                               return new OSGiResultImpl(
+                                       () -> {
+                                               OSGiResultImpl or1 = 
_operation.run(
+                                                       bundleContext,
+                                                       t -> {
+                                                               Node node = 
identities.addLast(t);
 
-               return new TransformerOSGi<>(this, fun);
+                                                               List<Runnable> 
terminators = funs.stream().map(
+                                                                       f -> 
op.apply(f.apply(t))
+                                                               ).collect(
+                                                                       
Collectors.toList()
+                                                               );
+
+                                                               return () -> {
+                                                                       
node.remove();
+
+                                                                       
terminators.forEach(Runnable::run);
+                                                               };
+                                                       }
+                                               );
+
+                                               myCloseReference.set(or1);
+
+                                               OSGiResultImpl funRun =
+                                                       ((OSGiImpl<Function<T, 
S>>) fun)._operation.run(
+                                                               bundleContext,
+                                                               f -> {
+                                                                       Node 
node = funs.addLast(f);
+
+                                                                       
List<Runnable> terminators =
+                                                                               
identities.stream().map(
+                                                                               
        t -> op.apply(f.apply(t))
+                                                                               
).collect(
+                                                                               
        Collectors.toList()
+                                                                               
);
+
+                                                                       return 
() -> {
+                                                                               
node.remove();
+
+                                                                               
terminators.forEach(Runnable::run);
+                                                                       };
+                                                               });
+
+                                               otherCloseReference.set(funRun);
+
+                                               or1.start();
+
+                                               funRun.start();
+                                       },
+                                       () -> {
+                                               myCloseReference.get().close();
+
+                                               
otherCloseReference.get().close();
+                                       });
+                       });
        }
 
        @Override
@@ -76,20 +126,25 @@ public class OSGiImpl<T> implements OSGi
                Function<OSGi<T>, OSGi<S>> otherwise) {
 
                return new OSGiImpl<>((bundleContext, publisher) -> {
-                       Function<T, Runnable> thenPipe = 
ProbeImpl.getProbePipe(then, bundleContext, publisher);
-
-                       Function<T, Runnable> elsePipe = 
ProbeImpl.getProbePipe(otherwise, bundleContext, publisher);
+                       Pad<T, S> thenPad = new Pad<>(bundleContext, then, 
publisher);
+                       Pad<T, S> elsePad = new Pad<>(bundleContext, otherwise, 
publisher);
 
-                       return _operation.run(
+                       OSGiResultImpl result = _operation.run(
                                bundleContext,
                                t -> {
                                        if (chooser.test(t)) {
-                                               return thenPipe.apply(t);
-                                       }
-                                       else {
-                                               return elsePipe.apply(t);
+                                               return thenPad.publish(t);
+                                       } else {
+                                               return elsePad.publish(t);
                                        }
                                });
+                       return new OSGiResultImpl(
+                               result::start,
+                               () -> {
+                                       thenPad.close();
+                                       elsePad.close();
+                                       result.close();
+                               });
                });
        }
 
@@ -97,60 +152,102 @@ public class OSGiImpl<T> implements OSGi
        @SafeVarargs
        public final <S> OSGi<S> distribute(Function<OSGi<T>, OSGi<S>> ... 
funs) {
                return new OSGiImpl<>((bundleContext, publisher) -> {
-                       List<Function<T, Runnable>> pipes =
+                       List<Pad<T, S>> pads =
                                Arrays.stream(
                                        funs
                                ).map(
-                                       fun -> ProbeImpl.getProbePipe(fun, 
bundleContext, publisher)
+                                       fun -> new Pad<>(bundleContext, fun, 
publisher)
                                ).collect(
                                        Collectors.toList()
                        );
 
-                       return _operation.run(
+                       OSGiResultImpl result = _operation.run(
                                bundleContext,
                                t -> {
                                        List<Runnable> terminators =
-                                               pipes.stream().map(p -> 
p.apply(t)).collect(
+                                               pads.stream().map(p -> 
p.publish(t)).collect(
                                                        Collectors.toList());
 
                                        return () -> {
                                                
terminators.forEach(Runnable::run);
                                        };
                                });
+
+                       return new OSGiResultImpl(
+                               result::start,
+                               () -> {
+                                       result.close();
+
+                                       pads.forEach(Pad::close);
+                               });
                });
        }
 
        @Override
-       public <K, S> OSGi<S> splitBy(
-               Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
-
-               return new OSGiImpl<>((bundleContext, op) -> {
-                       HashMap<K, Function<T, Runnable>> pipes = new 
HashMap<>();
-                       HashMap<K, OSGiResult> results = new HashMap<>();
+       public OSGi<T> effects(
+               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
-                       return _operation.run(
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(
                                bundleContext,
                                t -> {
-                                       K key = mapper.apply(t);
+                                       onAdded.accept(t);
 
-                                       results.computeIfAbsent(key, __ -> {
-                                               ProbeImpl<T> probe = new 
ProbeImpl<>();
+                                       Runnable terminator;
+                                       try {
+                                               terminator = op.apply(t);
+                                       }
+                                       catch (Exception e) {
+                                               onRemoved.accept(t);
 
-                                               OSGiImpl<S> program = 
(OSGiImpl<S>)fun.apply(probe);
+                                               throw e;
+                                       }
 
-                                               OSGiResult r = 
program._operation.run(
-                                                       bundleContext, op);
+                                       return () -> {
+                                               onRemoved.accept(t);
 
-                                               r.start();
+                                               terminator.run();
+                                       };
+                               }));
+       }
 
-                                               pipes.put(key, 
probe.getOperation());
+       @Override
+       public OSGi<T> filter(Predicate<T> predicate) {
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(
+                               bundleContext,
+                               (t) -> {
+                                       if (predicate.test(t)) {
+                                               return op.apply(t);
+                                       }
+                                       else {
+                                               return () -> {};
+                                       }
+                               }
+                       ));
+       }
 
-                                               return r;
-                                       });
+       @Override
+       public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
+               return new FlatMapImpl<>(this, fun);
+       }
 
-                                       return pipes.get(key).apply(t);
-                               });
-               });
+       @Override
+       public OSGi<Void> foreach(Consumer<? super T> onAdded) {
+               return foreach(onAdded, ign -> {});
+       }
+
+       @Override
+       public OSGi<Void> foreach(
+               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+
+               return OSGi.ignore(effects(onAdded, onRemoved));
+       }
+
+       @Override
+       public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(bundleContext, t -> 
op.apply(function.apply(t))));
        }
 
        @Override
@@ -182,10 +279,7 @@ public class OSGiImpl<T> implements OSGi
                                                OSGi<T> errorProgram = 
onError.apply(t, e);
 
                                                OSGiResult result =
-                                                       ((OSGiImpl<T>) 
errorProgram)._operation.run(
-                                                               bundleContext, 
op);
-
-                                               result.start();
+                                                       ((OSGiImpl<T>) 
errorProgram).run(bundleContext, op);
 
                                                return result::close;
                                        }
@@ -194,37 +288,40 @@ public class OSGiImpl<T> implements OSGi
        }
 
        @Override
-       public OSGi<T> effects(
-               Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
+       public <K, S> OSGi<S> splitBy(
+               Function<T, K> mapper, Function<OSGi<T>, OSGi<S>> fun) {
 
-               return new OSGiImpl<>((bundleContext, op) ->
-                       _operation.run(
-                               bundleContext,
-                               t -> {
-                                       onAdded.accept(t);
+               return new OSGiImpl<>((bundleContext, op) -> {
+                       HashMap<K, Pad<T, S>> pads = new HashMap<>();
 
-                                       Runnable terminator;
-                                       try {
-                                               terminator = op.apply(t);
-                                       }
-                                       catch (Exception e) {
-                                               onRemoved.accept(t);
+                       OSGiResultImpl result = _operation.run(
+                               bundleContext,
+                               t ->
+                                       pads.computeIfAbsent(
+                                               mapper.apply(t),
+                                               __ -> new Pad<>(bundleContext, 
fun, op)
+                                       ).apply(t)
+                       );
 
-                                               throw e;
-                                       }
+                       return new OSGiResultImpl(
+                               result::start,
+                               () -> {
+                                       pads.values().forEach(Pad::close);
 
-                                       return () -> {
-                                               onRemoved.accept(t);
+                                       result.close();
+                               });
+               });
+       }
 
-                                               terminator.run();
-                                       };
-                               }));
+       @Override
+       public <S> OSGi<S> then(OSGi<S> next) {
+               return flatMap(ignored -> next);
        }
 
        @Override
-       public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       _operation.run(bundleContext, t -> 
op.apply(function.apply(t))));
+       public <S> OSGi<S> transform(Transformer<T, S> fun) {
+
+               return new TransformerOSGi<>(this, fun);
        }
 
        @Override
@@ -234,23 +331,15 @@ public class OSGiImpl<T> implements OSGi
 
        @Override
        public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) 
{
-               OSGiResultImpl osgiResult =
-                       _operation.run(
-                               bundleContext,
-                               t -> {
-                                       andThen.accept(t);
+               return run(bundleContext, t -> {andThen.accept(t); return 
NOOP;});
+       }
 
-                                       return () -> {};
-                               });
+       public OSGiResult run(BundleContext bundleContext, Publisher<T> op) {
+               OSGiResultImpl result = _operation.run(bundleContext, op);
 
-               osgiResult.start();
+               result.start();
 
-               return osgiResult;
-       }
-
-       @Override
-       public <S> OSGi<S> then(OSGi<S> next) {
-               return flatMap(ignored -> next);
+               return result;
        }
 
        static Filter buildFilter(
@@ -302,95 +391,6 @@ public class OSGiImpl<T> implements OSGi
                return stringBuilder.toString();
        }
 
-       @Override
-       public OSGi<T> filter(Predicate<T> predicate) {
-               return new OSGiImpl<>((bundleContext, op) ->
-                       _operation.run(
-                               bundleContext,
-                               (t) -> {
-                                       if (predicate.test(t)) {
-                                               return op.apply(t);
-                                       }
-                                       else {
-                                               return () -> {};
-                                       }
-                               }
-                       ));
-       }
-
-       @Override
-       public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
-               return new OSGiImpl<>(
-                       (bundleContext, op) -> {
-                               AtomicReference<OSGiResult> myCloseReference =
-                                       new AtomicReference<>();
-
-                               AtomicReference<OSGiResult> otherCloseReference 
=
-                                       new AtomicReference<>();
-
-                               ConcurrentDoublyLinkedList<T> identities =
-                                       new ConcurrentDoublyLinkedList<>();
-
-                               ConcurrentDoublyLinkedList<Function<T, S>> funs 
=
-                                       new ConcurrentDoublyLinkedList<>();
-
-                               return new OSGiResultImpl(
-                                       () -> {
-                                               OSGiResultImpl or1 = 
_operation.run(
-                                                       bundleContext,
-                                                       t -> {
-                                                               Node node = 
identities.addLast(t);
-
-                                                               List<Runnable> 
terminators = funs.stream().map(
-                                                                       f -> 
op.apply(f.apply(t))
-                                                               ).collect(
-                                                                       
Collectors.toList()
-                                                               );
-
-                                                               return () -> {
-                                                                       
node.remove();
-
-                                                                       
terminators.forEach(Runnable::run);
-                                                               };
-                                                       }
-                                               );
-
-                                               myCloseReference.set(or1);
-
-                                               OSGiResultImpl funRun =
-                                                       ((OSGiImpl<Function<T, 
S>>) fun)._operation.run(
-                                                               bundleContext,
-                                                               f -> {
-                                                                       Node 
node = funs.addLast(f);
-
-                                                                       
List<Runnable> terminators =
-                                                                               
identities.stream().map(
-                                                                               
        t -> op.apply(f.apply(t))
-                                                                               
).collect(
-                                                                               
        Collectors.toList()
-                                                                               
);
-
-                                                                       return 
() -> {
-                                                                               
node.remove();
-
-                                                                               
terminators.forEach(Runnable::run);
-                                                                       };
-                                                               });
-
-                                               otherCloseReference.set(funRun);
-
-                                               or1.start();
-
-                                               funRun.start();
-                                       },
-                                       () -> {
-                                               myCloseReference.get().close();
-
-                                               
otherCloseReference.get().close();
-                                       });
-                       });
-       }
-
 }
 
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 Fri Nov 17 16:00:41 2017
@@ -19,6 +19,7 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGiOperation;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
 import org.osgi.framework.BundleContext;
 
 import java.util.function.Function;
@@ -28,8 +29,7 @@ import java.util.function.Function;
  */
 interface OSGiOperationImpl<T> extends OSGiOperation<T> {
 
-       OSGiResultImpl run(
-               BundleContext bundleContext, Function<T, Runnable> op);
+       OSGiResultImpl run(BundleContext bundleContext, Publisher<T> op);
 
        @Override
        default OSGiResult run(BundleContext bundleContext) {

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 Fri Nov 17 16:00:41 2017
@@ -26,19 +26,25 @@ import java.util.concurrent.atomic.Atomi
  */
 public class OSGiResultImpl implements OSGiResult {
 
-       private final Runnable start;
-       private final Runnable close;
-       private AtomicBoolean _working = new AtomicBoolean();
-       private AtomicBoolean _closed = new AtomicBoolean();
-       private volatile boolean _started = false;
-
-
        public OSGiResultImpl(Runnable start, Runnable close) {
                this.start = start;
                this.close = close;
        }
 
        @Override
+       public void close() {
+               while (!_working.compareAndSet(false, true)) {
+                       Thread.yield();
+               }
+
+               if (_closed.compareAndSet(false, true) && _started) {
+                       close.run();
+               }
+
+               _working.set(false);
+       }
+
+       @Override
        public void start() {
                if (_working.compareAndSet(false, true)) {
 
@@ -53,17 +59,10 @@ public class OSGiResultImpl implements O
 
        }
 
-       @Override
-       public void close() {
-               while (!_working.compareAndSet(false, true)) {
-                       Thread.yield();
-               }
-
-               if (_closed.compareAndSet(false, true) && _started) {
-                       close.run();
-               }
-
-               _working.set(false);
-       }
+       private final Runnable start;
+       private final Runnable close;
+       private AtomicBoolean _working = new AtomicBoolean();
+       private AtomicBoolean _closed = new AtomicBoolean();
+       private volatile boolean _started = false;
 
 }

Copied: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java
 (from r1815580, 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java)
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java?p2=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java&p1=aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java&r1=1815580&r2=1815581&rev=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pad.java
 Fri Nov 17 16:00:41 2017
@@ -17,23 +17,48 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGiOperation;
+import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
 import org.osgi.framework.BundleContext;
 
+import java.io.Closeable;
 import java.util.function.Function;
 
+import static org.apache.aries.osgi.functional.OSGi.NOOP;
+
 /**
  * @author Carlos Sierra Andrés
  */
-interface OSGiOperationImpl<T> extends OSGiOperation<T> {
+public class Pad<T, S> implements Publisher<T>, Closeable {
+
+    public Pad(
+        BundleContext bundleContext,
+        Function<OSGi<T>, OSGi<S>> fun,
+        Publisher<S> continuation) {
+
+        ProbeImpl<T> probe = new ProbeImpl<>();
+
+        OSGiImpl<S> next = (OSGiImpl<S>) fun.apply(probe);
+
+        _result = next.run(bundleContext, continuation);
+
+        _publisher =
+            probe.getPublisher() != null ?
+                probe.getPublisher() :
+                __ -> NOOP;
+    }
 
-       OSGiResultImpl run(
-               BundleContext bundleContext, Function<T, Runnable> op);
+    @Override
+    public void close() {
+        _result.close();
+    }
 
-       @Override
-       default OSGiResult run(BundleContext bundleContext) {
-               return run(bundleContext, (__) -> () -> {});
-       }
+    @Override
+    public Runnable publish(T t) {
+        return _publisher.publish(t);
+    }
 
+    private final OSGiResult _result;
+    private final Publisher<T> _publisher;
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ProbeImpl.java
 Fri Nov 17 16:00:41 2017
@@ -17,12 +17,9 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
+import org.apache.aries.osgi.functional.Publisher;
 import org.osgi.framework.BundleContext;
 
-import java.util.function.Function;
-
 /**
  * @author Carlos Sierra Andrés
  */
@@ -32,41 +29,22 @@ public class ProbeImpl<T> extends OSGiIm
         super(new ProbeOperationImpl<>());
     }
 
-    public Function<T, Runnable> getOperation() {
+    public Publisher<T> getPublisher() {
         return ((ProbeOperationImpl<T>) _operation)._op;
     }
 
-    public static <T, S> Function<T, Runnable> getProbePipe(
-        Function<OSGi<T>, OSGi<S>> then, BundleContext bundleContext,
-        Function<S, Runnable> publisher) {
-
-        ProbeImpl<T> thenProbe = new ProbeImpl<>();
-
-        OSGiImpl<S> thenNext = (OSGiImpl<S>) then.apply(thenProbe);
-
-        OSGiResult thenResult = thenNext._operation.run(
-            bundleContext, publisher);
-
-        Function<T, Runnable> thenPipe = thenProbe.getOperation();
-
-        thenResult.start();
-
-        return thenPipe;
-    }
-
     private static class ProbeOperationImpl<T> implements OSGiOperationImpl<T> 
{
 
-        BundleContext _bundleContext;
-        Function<T, Runnable> _op;
-
         @Override
         public OSGiResultImpl run(
-            BundleContext bundleContext, Function<T, Runnable> op) {
+            BundleContext bundleContext, Publisher<T> op) {
             _bundleContext = bundleContext;
             _op = op;
 
             return new OSGiResultImpl(NOOP, NOOP);
         }
+        BundleContext _bundleContext;
+        Publisher<T> _op;
     }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 Fri Nov 17 16:00:41 2017
@@ -56,10 +56,6 @@ public class ServiceReferenceOSGi<T>
        private static class DefaultServiceTrackerCustomizer<T>
                implements ServiceTrackerCustomizer<T, Tracked<T>> {
 
-               private final Function<CachingServiceReference<T>, Runnable>
-                       _addedSource;
-               private Refresher<? super CachingServiceReference<T>> 
_refresher;
-
                public DefaultServiceTrackerCustomizer(
                        Function<CachingServiceReference<T>, Runnable> 
addedSource,
                        Refresher<? super CachingServiceReference<T>> 
refresher) {
@@ -97,13 +93,15 @@ public class ServiceReferenceOSGi<T>
 
                        tracked.runnable.run();
                }
+
+               private final Function<CachingServiceReference<T>, Runnable>
+                       _addedSource;
+               private Refresher<? super CachingServiceReference<T>> 
_refresher;
+
        }
 
        private static class Tracked<T> {
 
-               volatile CachingServiceReference<T> cachingServiceReference;
-               volatile Runnable runnable;
-
                public Tracked(
                        CachingServiceReference<T> cachingServiceReference,
                        Runnable runnable) {
@@ -111,5 +109,9 @@ public class ServiceReferenceOSGi<T>
                        this.cachingServiceReference = cachingServiceReference;
                        this.runnable = runnable;
                }
+
+               volatile CachingServiceReference<T> cachingServiceReference;
+               volatile Runnable runnable;
+
        }
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/TransformerOSGi.java
 Fri Nov 17 16:00:41 2017
@@ -17,19 +17,17 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import java.util.function.Function;
+import org.apache.aries.osgi.functional.Transformer;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class TransformerOSGi<T, R> extends OSGiImpl<R> {
 
-    public TransformerOSGi(
-        OSGiImpl<T> previous,
-        Function<Function<R, Runnable>, Function<T, Runnable>> fun) {
+    public TransformerOSGi(OSGiImpl<T> previous, Transformer<T, R> fun) {
 
         super((bundleContext, op) ->
-            previous._operation.run(bundleContext, fun.apply(op)));
+            previous._operation.run(bundleContext, fun.transform(op)));
     }
 
 }

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/internal/ProbeTests.java
 Fri Nov 17 16:00:41 2017
@@ -18,14 +18,12 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.test.DSLTest;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -70,11 +68,11 @@ public class ProbeTests {
 
         program.run(bundleContext, result::set);
 
-        Function<String, Runnable> opA = probeA.getOperation();
+        Function<String, Runnable> opA = probeA.getPublisher();
 
         Runnable sentA = opA.apply("Hello");
 
-        Function<String, Runnable> opB = probeBreference.get().getOperation();
+        Function<String, Runnable> opB = probeBreference.get().getPublisher();
 
         sentA.run();
 
@@ -85,10 +83,10 @@ public class ProbeTests {
 
         program.run(bundleContext, result::set);
 
-        opA = probeA.getOperation();
+        opA = probeA.getPublisher();
         sentA = opA.apply("Hello");
 
-        opB = probeBreference.get().getOperation();
+        opB = probeBreference.get().getPublisher();
         sentB = opB.apply(", World");
 
         assertEquals("Hello, World", result.get());
@@ -117,7 +115,7 @@ public class ProbeTests {
         program.run(bundleContext, result::set);
         assertEquals(0, result.get());
 
-        Function<Integer, Runnable> opA = probeA.getOperation();
+        Function<Integer, Runnable> opA = probeA.getPublisher();
 
         Runnable sentA = opA.apply(5);
         assertEquals(15, result.get());

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/AsynchronousTest.java
 Fri Nov 17 16:00:41 2017
@@ -19,7 +19,6 @@ package org.apache.aries.osgi.functional
 
 import org.apache.aries.osgi.functional.OSGi;
 import org.apache.aries.osgi.functional.OSGiResult;
-import org.apache.aries.osgi.functional.SentEvent;
 import org.apache.aries.osgi.functional.internal.ProbeImpl;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -337,9 +336,9 @@ public class AsynchronousTest {
 
         result.start();
 
-        Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) 
as).getOperation();
-        Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) 
bs).getOperation();
-        Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) 
cs).getOperation();
+        Function<Integer, Runnable> opa = ((ProbeImpl<Integer>) 
as).getPublisher();
+        Function<Integer, Runnable> opb = ((ProbeImpl<Integer>) 
bs).getPublisher();
+        Function<Integer, Runnable> opc = ((ProbeImpl<Integer>) 
cs).getPublisher();
 
         ExecutorService executor = Executors.newFixedThreadPool(8);
 

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 Fri Nov 17 16:00:41 2017
@@ -23,6 +23,7 @@ import org.apache.aries.osgi.functional.
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1815581&r1=1815580&r2=1815581&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 Fri Nov 17 16:00:41 2017
@@ -712,7 +712,7 @@ public class DSLTest {
 
         once.run(bundleContext);
 
-        Function<Integer, Runnable> op = probe.getOperation();
+        Function<Integer, Runnable> op = probe.getPublisher();
 
         assertEquals(0, count.get());
 


Reply via email to