Author: csierra
Date: Tue Oct 10 15:51:22 2017
New Revision: 1811723

URL: http://svn.apache.org/viewvc?rev=1811723&view=rev
Log:
[Component-DSL] refactor to remove Pipe

Pipe is not needed.

Removed:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
    
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiOperation.java
 Tue Oct 10 15:51:22 2017
@@ -24,6 +24,6 @@ import org.osgi.framework.BundleContext;
  */
 public interface OSGiOperation<T> {
 
-       OSGiResult<T> run(BundleContext bundleContext);
+       OSGiResult run(BundleContext bundleContext);
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiResult.java
 Tue Oct 10 15:51:22 2017
@@ -20,8 +20,10 @@ package org.apache.aries.osgi.functional
 /**
  * @author Carlos Sierra Andrés
  */
-public interface OSGiResult<T> extends AutoCloseable {
+public interface OSGiResult extends AutoCloseable {
 
        @Override
        public void close();
+
+       public void start();
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/OSGiRunnable.java
 Tue Oct 10 15:51:22 2017
@@ -25,7 +25,7 @@ import java.util.function.Consumer;
  * @author Carlos Sierra Andrés
  */
 public interface OSGiRunnable<T> {
-       OSGiResult<T> run(BundleContext bundleContext);
+       OSGiResult run(BundleContext bundleContext);
 
-       OSGiResult<T> run(BundleContext bundleContext, Consumer<T> andThen);
+       OSGiResult run(BundleContext bundleContext, Consumer<T> andThen);
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleContextOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -25,7 +25,7 @@ import org.osgi.framework.BundleContext;
 public class BundleContextOSGiImpl extends OSGiImpl<BundleContext> {
 
        public BundleContextOSGiImpl() {
-               super(bundleContext ->
-                       new 
JustOSGiImpl<>(bundleContext)._operation.run(bundleContext));
+               super((bundleContext, op) ->
+                       new 
JustOSGiImpl<>(bundleContext)._operation.run(bundleContext, op));
        }
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 Tue Oct 10 15:51:22 2017
@@ -35,11 +35,7 @@ public class BundleOSGi extends OSGiImpl
        private final int _stateMask;
 
        public BundleOSGi(int stateMask) {
-               super(bundleContext -> {
-                       Pipe<Bundle, Bundle> added = Pipe.create();
-
-                       Consumer<Tuple<Bundle>> addedSource = added.getSource();
-
+               super((bundleContext, op) -> {
                        BundleTracker<Tuple<Bundle>> bundleTracker =
                                new BundleTracker<>(
                                        bundleContext, stateMask,
@@ -51,7 +47,7 @@ public class BundleOSGi extends OSGiImpl
 
                                                        Tuple<Bundle> tuple = 
Tuple.create(bundle);
 
-                                                       
addedSource.accept(tuple);
+                                                       op.accept(tuple);
 
                                                        return tuple;
                                                }
@@ -75,8 +71,8 @@ public class BundleOSGi extends OSGiImpl
                                                }
                                        });
 
-                       return new OSGiResultImpl<>(
-                               added, bundleTracker::open, 
bundleTracker::close);
+                       return new OSGiResultImpl(
+                               bundleTracker::open, bundleTracker::close);
                });
 
                _stateMask = stateMask;
@@ -86,27 +82,23 @@ public class BundleOSGi extends OSGiImpl
        public <S> OSGiImpl<S> flatMap(
                Function<? super Bundle, OSGi<? extends S>> fun) {
 
-               return new OSGiImpl<>(bundleContext -> {
-                       Pipe<S, S> added = Pipe.create();
-
-                       Consumer<Tuple<S>> addedSource = added.getSource();
-
-                       BundleTracker<OSGiResult<S>> bundleTracker =
+               return new OSGiImpl<>((bundleContext, op) -> {
+                       BundleTracker<OSGiResult> bundleTracker =
                                new BundleTracker<>(
                                        bundleContext, _stateMask,
-                                       new 
BundleTrackerCustomizer<OSGiResult<S>>() {
+                                       new 
BundleTrackerCustomizer<OSGiResult>() {
 
                                                @Override
-                                               public OSGiResult<S> 
addingBundle(
+                                               public OSGiResult addingBundle(
                                                        Bundle bundle, 
BundleEvent bundleEvent) {
 
                                                        OSGiImpl<S> program = 
(OSGiImpl<S>) fun.apply(
                                                                bundle);
 
-                                                       OSGiResultImpl<S> 
result =
-                                                               
program._operation.run(bundleContext);
+                                                       OSGiResultImpl result = 
program._operation.run(
+                                                               bundleContext, 
op);
 
-                                                       
result.pipeTo(addedSource);
+                                                       result.start();
 
                                                        return result;
                                                }
@@ -114,7 +106,7 @@ public class BundleOSGi extends OSGiImpl
                                                @Override
                                                public void modifiedBundle(
                                                        Bundle bundle, 
BundleEvent bundleEvent,
-                                                       OSGiResult<S> result) {
+                                                       OSGiResult result) {
 
                                                        removedBundle(bundle, 
bundleEvent, result);
 
@@ -124,14 +116,14 @@ public class BundleOSGi extends OSGiImpl
                                                @Override
                                                public void removedBundle(
                                                        Bundle bundle, 
BundleEvent bundleEvent,
-                                                       OSGiResult<S> result) {
+                                                       OSGiResult result) {
 
                                                        result.close();
                                                }
                                        });
 
-                       return new OSGiResultImpl<>(
-                               added, bundleTracker::open, 
bundleTracker::close);
+                       return new OSGiResultImpl(
+                               bundleTracker::open, bundleTracker::close);
 
                });
        }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ChangeContextOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -28,6 +28,7 @@ public class ChangeContextOSGiImpl<T> ex
        public ChangeContextOSGiImpl(
                OSGi<T> program, BundleContext bundleContext) {
 
-               super(b -> ((OSGiImpl<T>) 
program)._operation.run(bundleContext));
+               super((b, op) ->
+                       ((OSGiImpl<T>) program)._operation.run(bundleContext, 
op));
        }
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -23,7 +23,6 @@ import org.osgi.service.cm.ManagedServic
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 
 /**
  * @author Carlos Sierra Andrés
@@ -32,7 +31,7 @@ public class ConfigurationOSGiImpl
        extends OSGiImpl<Dictionary<String, ?>> {
 
        public ConfigurationOSGiImpl(String pid) {
-               super(bundleContext -> {
+               super((bundleContext, op) -> {
                        AtomicReference<Dictionary<String, ?>> atomicReference =
                                new AtomicReference<>(null);
 
@@ -43,26 +42,20 @@ public class ConfigurationOSGiImpl
                        AtomicReference<ServiceRegistration<ManagedService>>
                                serviceRegistrationReferece = new 
AtomicReference<>(null);
 
-                       Pipe<Dictionary<String, ?>, Dictionary<String, ?>> 
added =
-                               Pipe.create();
-
-                       Consumer<Tuple<Dictionary<String, ?>>> addedSource =
-                               added.getSource();
-
                        Runnable start = () ->
                                serviceRegistrationReferece.set(
                                        bundleContext.registerService(
                                                ManagedService.class,
                                                properties -> {
                                                        while 
(!atomicReference.compareAndSet(
-                                                               
tupleAtomicReference.get().t,
+                                                               
tupleAtomicReference.get()._t,
                                                                properties)) {
                                                        }
 
                                                        
Tuple<Dictionary<String, ?>> old =
                                                                
tupleAtomicReference.get();
 
-                                                       if (old.t != null) {
+                                                       if (old._t != null) {
                                                                old.terminate();
                                                        }
 
@@ -70,7 +63,7 @@ public class ConfigurationOSGiImpl
                                                                
Tuple.create(properties);
 
                                                        if (properties != null) 
{
-                                                               
addedSource.accept(tuple);
+                                                               
op.accept(tuple);
                                                        }
 
                                                        
tupleAtomicReference.set(tuple);
@@ -79,8 +72,8 @@ public class ConfigurationOSGiImpl
                                                        put("service.pid", pid);
                                                }}));
 
-                       return new OSGiResultImpl<>(
-                               added, start,
+                       return new OSGiResultImpl(
+                               start,
                                () -> {
                                        
serviceRegistrationReferece.get().unregister();
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -35,32 +35,25 @@ public class ConfigurationsOSGiImpl
        extends OSGiImpl<Dictionary<String, ?>> {
 
        public ConfigurationsOSGiImpl(String factoryPid) {
-               super(bundleContext -> {
+               super((bundleContext, op) -> {
                        Map<String, Tuple<Dictionary<String, ?>>> results =
                                new ConcurrentHashMap<>();
 
                        
AtomicReference<ServiceRegistration<ManagedServiceFactory>>
                                serviceRegistrationReference = new 
AtomicReference<>(null);
 
-                       Pipe<Dictionary<String, ?>, Dictionary<String, ?>>
-                               added = Pipe.create();
-
-                       Consumer<Tuple<Dictionary<String, ?>>> addedSource =
-                               added.getSource();
-
                        Runnable start = () ->
                                serviceRegistrationReference.set(
                                        bundleContext.registerService(
                                                ManagedServiceFactory.class,
-                                               new 
ConfigurationsManagedServiceFactory(
-                                                       results, addedSource),
+                                               new 
ConfigurationsManagedServiceFactory(results, op),
                                                new Hashtable<String, Object>() 
{{
                                                        put("service.pid", 
factoryPid);
                                                }}));
 
 
-                       return new OSGiResultImpl<>(
-                               added, start,
+                       return new OSGiResultImpl(
+                               start,
                                () -> {
                                        
serviceRegistrationReference.get().unregister();
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 Tue Oct 10 15:51:22 2017
@@ -33,30 +33,21 @@ public class DistributeOSGi<T> extends O
 
     @SafeVarargs
     public DistributeOSGi(OSGi<T>... programs) {
-        super(bundleContext -> {
-            Pipe<T, T> added = Pipe.create();
+        super((bundleContext, op) -> {
+            List<OSGiResult> results = new ArrayList<>();
 
-            Consumer<Tuple<T>> addedSource = added.getSource();
-
-            List<OSGiResult<T>> results = new ArrayList<>();
-
-            return new OSGiResultImpl<>(
-                added,
-                () ->
+            return new OSGiResultImpl(
+                () -> {
                     results.addAll(
                         Arrays.stream(programs).
-                            map(o -> {
-                                OSGiResultImpl<T> osGiResult =
-                                    ((OSGiImpl<T>) o)._operation.run(
-                                        bundleContext);
-
-                                osGiResult.pipeTo(addedSource);
+                            map(o -> ((OSGiImpl<T>) o)._operation.run(
+                                bundleContext, op)).
+                            collect(Collectors.toList()));
 
-                                return osGiResult;
-                            }).
-                            collect(Collectors.toList())),
+                    results.forEach(OSGiResult::start);
+                },
                 () -> {
-                    for (OSGiResult<?> result : results) {
+                    for (OSGiResult result : results) {
                         try {
                             result.close();
                         }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
 Tue Oct 10 15:51:22 2017
@@ -18,12 +18,7 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -34,40 +29,19 @@ public class FlatMapImpl<T, S> extends O
        public FlatMapImpl(
                OSGiImpl<T> previous, Function<? super T, OSGi<? extends S>> 
fun) {
 
-               super((bundleContext) -> {
-                       AtomicReference<Runnable> closeReference =
-                               new AtomicReference<>(NOOP);
+               super((bundleContext, op) ->
+                       previous._operation.run(
+                               bundleContext,
+                               t -> {
+                                       OSGiImpl<S> program = (OSGiImpl<S>) 
fun.apply(t._t);
 
-                       Pipe<S, S> added = Pipe.create();
+                                       OSGiResultImpl result =
+                                               
program._operation.run(bundleContext, op);
 
-                       Consumer<Tuple<S>> addedSource = added.getSource();
+                                       t.onTermination(result::close);
 
-                       return new OSGiResultImpl<>(
-                               added,
-                               () -> {
-                                       OSGiResultImpl<T> or1 = 
previous._operation.run(
-                                               bundleContext);
-
-                                       closeReference.set(or1.close);
-
-                                       or1.added.map(t -> {
-                                               OSGiImpl<S> program = 
(OSGiImpl<S>)fun.apply((T)t.t);
-
-                                               OSGiResultImpl<S> or2 =
-                                                       
program._operation.run(bundleContext);
-
-                                               t.onTermination(or2::close);
-
-                                               or2.pipeTo(addedSource);
-
-                                               return null;
-                                       });
-
-                                       or1.start.run();
-                               },
-                               () -> closeReference.get().run());
-                       }
-               );
+                                       result.start();
+                               }));
        }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -18,85 +18,37 @@
 
 package org.apache.aries.osgi.functional.internal;
 
-import org.apache.aries.osgi.functional.OSGi;
-import org.apache.aries.osgi.functional.OSGiResult;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * @author Carlos Sierra Andrés
  */
 public class JustOSGiImpl<T> extends OSGiImpl<T> {
 
-       private Supplier<Collection<T>> _t;
-
        public JustOSGiImpl(Collection<T> t) {
                this(() -> t);
        }
 
        public JustOSGiImpl(Supplier<Collection<T>> t) {
-               super(((bundleContext) -> {
-
-                       Pipe<T, T> added = Pipe.create();
+               super((bundleContext, op) -> {
 
-                       AtomicReference<Collection<Tuple<T>>> 
collectionAtomicReference =
-                               new AtomicReference<>();
-
-                       return new OSGiResultImpl<>(
-                               added,
-                               () -> {
-                                       List<Tuple<T>> tuples =
-                                               
t.get().stream().map(Tuple::create).collect(
-                                                       Collectors.toList());
-
-                                       collectionAtomicReference.set(tuples);
-
-                                       tuples.forEach(tuple ->
-                                               
added.getSource().accept(tuple));
-                               },
-                               () ->
-                                       
collectionAtomicReference.get().forEach(Tuple::terminate));
-               }));
+                       Collection<Tuple<T>> references =
+                               t.get().stream().map(Tuple::create).collect(
+                                       Collectors.toList());
+
+                       return new OSGiResultImpl(
+                               () -> references.forEach(op),
+                               () -> references.forEach(Tuple::terminate));
+               });
 
-               _t = t;
        }
 
        public JustOSGiImpl(T t) {
                this(() -> Collections.singletonList(t));
        }
 
-       @Override
-       public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
-               return new OSGiImpl<>(bundleContext -> {
-                       Pipe<S, S> added = Pipe.create();
-
-                       AtomicReference<Runnable> atomicReference = new 
AtomicReference<>(
-                               NOOP);
-
-                       return new OSGiResultImpl<>(
-                               added,
-                               () -> {
-                                       List<OSGiResultImpl<S>> results = 
_t.get().stream().map(
-                                               p -> (OSGiImpl<S>) fun.apply(p)
-                                       ).map(
-                                               n -> 
n._operation.run(bundleContext)
-                                       ).collect(Collectors.toList());
-
-                                       atomicReference.set(
-                                               () -> 
results.forEach(OSGiResult::close));
-
-                                       results.forEach(result -> 
result.pipeTo(added.getSource()));
-                               },
-                               () -> atomicReference.get().run());
-               });
-       }
-
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/NothingOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -25,7 +25,6 @@ import org.apache.aries.osgi.functional.
 public class NothingOSGiImpl<S> extends OSGiImpl<S> {
 
        public NothingOSGiImpl() {
-               super(((bundleContext) -> new OSGiResultImpl<>(
-                       Pipe.create(), OSGi.NOOP, OSGi.NOOP)));
+               super((bundleContext, __) -> new OSGiResultImpl(OSGi.NOOP, 
OSGi.NOOP));
        }
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -54,46 +54,41 @@ public class OSGiImpl<T> implements OSGi
        public OSGi<Void> foreach(
                Consumer<? super T> onAdded, Consumer<? super T> onRemoved) {
 
-               return new OSGiImpl<>(((bundleContext) -> {
-                       OSGiResultImpl<T> osgiResult = 
_operation.run(bundleContext);
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(
+                               bundleContext,
+               t -> {
+                       t.onTermination(() -> onRemoved.accept(t._t));
 
-                       return new OSGiResultImpl<>(
-                               osgiResult.added.map(
-                                       t -> {
-                                               t.onTermination(() -> 
onRemoved.accept(t.t));
-
-                                               return t.map(o -> 
{onAdded.accept(o); return null;});
-                                       }),
-                               osgiResult.start, osgiResult.close);
-               }));
+                       onAdded.accept(t._t);
+
+                                       Tuple<Void> tuple = Tuple.create(null);
+
+                                       t.addRelatedTuple(tuple);
+
+                                       op.accept(tuple);
+               }));
        }
 
        @Override
        public <S> OSGi<S> map(Function<? super T, ? extends S> function) {
-               return new OSGiImpl<>(((bundleContext) -> {
-                       OSGiResultImpl<T> osgiResult = 
_operation.run(bundleContext);
-
-                       return new OSGiResultImpl<>(
-                               osgiResult.added.map(t -> t.map(function)),
-                               osgiResult.start, osgiResult.close);
-               }));
+               return new OSGiImpl<>((bundleContext, op) ->
+                       _operation.run(bundleContext, t -> 
op.accept(t.map(function))));
        }
 
        @Override
-       public OSGiResult<T> run(BundleContext bundleContext) {
+       public OSGiResult run(BundleContext bundleContext) {
                return run(bundleContext, x -> {});
        }
 
        @Override
-       public OSGiResult<T> run(BundleContext bundleContext, Consumer<T> 
andThen) {
-               OSGiResultImpl<T> osgiResult = _operation.run(bundleContext);
+       public OSGiResult run(BundleContext bundleContext, Consumer<T> andThen) 
{
+               OSGiResultImpl osgiResult =
+                       _operation.run(bundleContext, t -> 
andThen.accept(t._t));
 
-               osgiResult.added.map(x -> {andThen.accept(x.t); return x;});
+               osgiResult.start();
 
-               osgiResult.start.run();
-
-               return new OSGiResultImpl<>(
-                       osgiResult.added, osgiResult.start, osgiResult.close);
+               return osgiResult;
        }
 
        @Override
@@ -167,47 +162,14 @@ public class OSGiImpl<T> implements OSGi
                return new RouteOsgiImpl<>(this, routerConsumer);
        }
 
-       private static class Pair<X, Y> {
-               private final X _first;
-               private final Y _second;
-
-               public Pair(X first, Y second) {
-                       _first = first;
-                       _second = second;
-               }
-
-               public X getFirst() {
-                       return _first;
-               }
-
-               public Y getSecond() {
-                       return _second;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) return true;
-                       if (o == null || getClass() != o.getClass()) return 
false;
-
-                       Pair<?, ?> pair = (Pair<?, ?>) o;
-
-                       return _first.equals(pair._first);
-               }
-
-               @Override
-               public int hashCode() {
-                       return _first.hashCode();
-               }
-       }
-
        @Override
        public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
                return new OSGiImpl<>(
-                       ((bundleContext) -> {
-                               AtomicReference<OSGiResult<?>> myCloseReference 
=
+                       ((bundleContext, op) -> {
+                               AtomicReference<OSGiResult> myCloseReference =
                                        new AtomicReference<>();
 
-                               AtomicReference<OSGiResult<?>> 
otherCloseReference =
+                               AtomicReference<OSGiResult> otherCloseReference 
=
                                        new AtomicReference<>();
 
                                DoublyLinkedList<Tuple<T>> identities =
@@ -216,52 +178,44 @@ public class OSGiImpl<T> implements OSGi
                                DoublyLinkedList<Tuple<Function<T, S>>> funs =
                                        new DoublyLinkedList<>();
 
-                               Pipe<S, S> added = Pipe.create();
-
-                               Consumer<Tuple<S>> addedSource = 
added.getSource();
-
-                               return new OSGiResultImpl<>(
-                                       added,
+                               return new OSGiResultImpl(
                                        () -> {
-                                               OSGiResultImpl<T> or1 = 
_operation.run(bundleContext);
+                                               OSGiResultImpl or1 = 
_operation.run(
+                                                       bundleContext,
+                                                       t -> {
+                                                               synchronized 
(identities) {
+                                                                       
Node<Tuple<T>> node = identities.addLast(t);
+
+                                                                       
t.onTermination(node::remove);
+
+                                                                       
funs.forEach(
+                                                                               
f -> processAdded(op, f, t));
+                                                               }
+                                                       }
+                                               );
 
                                                myCloseReference.set(or1);
 
-                                               or1.added.map(t -> {
-                                                       synchronized 
(identities) {
-                                                               Node<Tuple<T>> 
node = identities.addLast(t);
-
-                                                               
t.onTermination(node::remove);
-
-                                                               funs.forEach(f 
-> processAdded(addedSource, f, t));
-
-                                                               return null;
-                                                       }
-                                               });
-
-                                               OSGiResultImpl<Function<T, S>> 
funRun =
+                                               OSGiResultImpl funRun =
                                                        ((OSGiImpl<Function<T, 
S>>) fun)._operation.run(
-                                                               bundleContext);
+                                                               bundleContext,
+                                                               f -> {
+                                                                       
synchronized (identities) {
+                                                                               
Node<Tuple<Function<T, S>>> node =
+                                                                               
        funs.addLast(f);
+
+                                                                               
f.onTermination(node::remove);
+
+                                                                               
identities.forEach(
+                                                                               
        t -> processAdded(op, f, t));
+                                                                       }
+                                                               });
 
                                                otherCloseReference.set(funRun);
 
-                                               funRun.added.map(f -> {
-                                                       synchronized 
(identities) {
-                                                               
Node<Tuple<Function<T, S>>> node =
-                                                                       
funs.addLast(f);
-
-                                                               
f.onTermination(node::remove);
-
-                                                               
identities.forEach(
-                                                                       t -> 
processAdded(addedSource, f, t));
-
-                                                               return null;
-                                                       }
-                                               });
-
-                                               or1.start.run();
+                                               or1.start();
 
-                                               funRun.start.run();
+                                               funRun.start();
                                        },
                                        () -> {
                                                synchronized (identities) {

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiOperationImpl.java
 Tue Oct 10 15:51:22 2017
@@ -18,14 +18,22 @@
 package org.apache.aries.osgi.functional.internal;
 
 import org.apache.aries.osgi.functional.OSGiOperation;
+import org.apache.aries.osgi.functional.OSGiResult;
 import org.osgi.framework.BundleContext;
 
+import java.util.function.Consumer;
+
 /**
  * @author Carlos Sierra Andrés
  */
 interface OSGiOperationImpl<T> extends OSGiOperation<T> {
 
+       OSGiResultImpl run(
+               BundleContext bundleContext, Consumer<Tuple<T>> consumer);
+
        @Override
-       OSGiResultImpl<T> run(BundleContext bundleContext);
+       default OSGiResult run(BundleContext bundleContext) {
+               return run(bundleContext, (__) -> {});
+       }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 Tue Oct 10 15:51:22 2017
@@ -20,33 +20,29 @@ package org.apache.aries.osgi.functional
 import org.apache.aries.osgi.functional.OSGiResult;
 
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * @author Carlos Sierra Andrés
  */
-public class OSGiResultImpl<T> implements OSGiResult<T> {
+public class OSGiResultImpl implements OSGiResult {
 
-       public Pipe<?, T> added;
        public Runnable start;
        public Runnable close;
 
-       public OSGiResultImpl(
-               Pipe<?, T> added, Runnable start, Runnable close) {
-
-               this.added = added;
+       public OSGiResultImpl(Runnable start, Runnable close) {
                this.start = start;
                this.close = close;
        }
 
        @Override
-       public void close() {
-               close.run();
+       public void start() {
+               start.run();
        }
 
-       public void pipeTo(Consumer<Tuple<T>> addedSource) {
-               added.map(t -> {addedSource.accept(t); return null;});
-
-               start.run();
+       @Override
+       public void close() {
+               close.run();
        }
 
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -23,14 +23,11 @@ package org.apache.aries.osgi.functional
 public class OnCloseOSGiImpl extends OSGiImpl<Void> {
 
        public OnCloseOSGiImpl(Runnable action) {
-               super(bundleContext -> {
-                       Pipe<Void, Void> added = Pipe.create();
-
+               super((bundleContext, op) -> {
                        Tuple<Void> tuple = Tuple.create(null);
 
-                       return new OSGiResultImpl<>(
-                               added,
-                               () -> added.getSource().accept(tuple),
+                       return new OSGiResultImpl(
+                               () -> op.accept(tuple),
                                () -> {
                                        action.run();
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -27,32 +27,21 @@ public class RouteOsgiImpl<T> extends OS
     public RouteOsgiImpl(
         OSGiImpl<T> previous, Consumer<Router<T>> routerConsumer) {
 
-        super(((bundleContext) -> {
-
-            Pipe<T, T> outgoingAddingPipe = Pipe.create();
-
-            Consumer<Tuple<T>> outgoingAddingSource =
-                outgoingAddingPipe.getSource();
-
+        super((bundleContext, op) -> {
             final RouterImpl<T> router =
-                new RouterImpl<>(outgoingAddingSource);
+                new RouterImpl<>(op);
 
             routerConsumer.accept(router);
 
-            OSGiResultImpl<T> osgiResult = previous._operation.run(
-                bundleContext);
-
-            osgiResult.added.map(
+            OSGiResultImpl osgiResult = previous._operation.run(
+                bundleContext,
                 t -> {
                     router._adding.accept(t);
 
                     t.onTermination(() -> router._leaving.accept(t));
-
-                    return null;
                 });
 
-            return new OSGiResultImpl<>(
-                outgoingAddingPipe,
+            return new OSGiResultImpl(
                 () -> {
                     router._start.run();
                     osgiResult.start.run();
@@ -61,7 +50,7 @@ public class RouteOsgiImpl<T> extends OS
                     router._close.run();
                     osgiResult.close.run();
                 });
-        }));
+        });
     }
 
     static class RouterImpl<T> implements Router<T> {
@@ -94,7 +83,7 @@ public class RouteOsgiImpl<T> extends OS
         public SentEvent<T> signalAdd(Event<T> event) {
             Tuple<T> tuple = (Tuple<T>) event;
 
-            Tuple<T> copy = Tuple.create(tuple.t);
+            Tuple<T> copy = Tuple.create(tuple._t);
 
             tuple.addRelatedTuple(copy);
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 Tue Oct 10 15:51:22 2017
@@ -37,18 +37,15 @@ public class ServiceReferenceOSGi<T> ext
        private Class<T> _clazz;
 
        public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
-               super(bundleContext -> {
-                       Pipe<ServiceReference<T>, ServiceReference<T>>
-                               added = Pipe.create();
-
+               super((bundleContext, op) -> {
                        ServiceTracker<T, 
AtomicReference<Tuple<ServiceReference<T>>>>
                                serviceTracker = new ServiceTracker<>(
                                        bundleContext,
                                        buildFilter(bundleContext, 
filterString, clazz),
-                                       new 
DefaultServiceTrackerCustomizer<>(added.getSource()));
+                                       new 
DefaultServiceTrackerCustomizer<>(op));
 
-                       return new OSGiResultImpl<>(
-                               added, serviceTracker::open, 
serviceTracker::close);
+                       return new OSGiResultImpl(
+                               serviceTracker::open, serviceTracker::close);
                });
 
                _filterString = filterString;
@@ -59,19 +56,17 @@ public class ServiceReferenceOSGi<T> ext
        public <S> OSGiImpl<S> flatMap(
                Function<? super ServiceReference<T>, OSGi<? extends S>> fun) {
 
-               return new OSGiImpl<>(bundleContext -> {
-                       Pipe<S, S> added = Pipe.create();
-
+               return new OSGiImpl<>((bundleContext, op) -> {
                        ServiceTracker<T, ?> serviceTracker =
                                new ServiceTracker<>(
                                        bundleContext,
                                        buildFilter(
                                                bundleContext, _filterString, 
_clazz),
                                                new 
FlatMapServiceTrackerCustomizer<>(
-                                                       fun, bundleContext, 
added.getSource()));
+                                                       fun, bundleContext, 
op));
 
-                       return new OSGiResultImpl<>(
-                               added, serviceTracker::open, 
serviceTracker::close);
+                       return new OSGiResultImpl(
+                               serviceTracker::open, serviceTracker::close);
                });
        }
 
@@ -121,45 +116,44 @@ public class ServiceReferenceOSGi<T> ext
        }
 
        private static class FlatMapServiceTrackerCustomizer<T, S>
-               implements ServiceTrackerCustomizer<T, 
AtomicReference<OSGiResult<S>>> {
+               implements ServiceTrackerCustomizer<T, 
AtomicReference<OSGiResult>> {
                private final Function<? super ServiceReference<T>, OSGi<? 
extends S>>
                        _fun;
                private final BundleContext _bundleContext;
-               private final Consumer<Tuple<S>> _addedSource;
+               private final Consumer<Tuple<S>> _op;
 
                FlatMapServiceTrackerCustomizer(
                        Function<? super ServiceReference<T>, OSGi<? extends 
S>> fun,
-                       BundleContext bundleContext, Consumer<Tuple<S>> 
addedSource) {
+                       BundleContext bundleContext, Consumer<Tuple<S>> op) {
 
                        _fun = fun;
                        _bundleContext = bundleContext;
-                       _addedSource = addedSource;
+                       _op = op;
                }
 
                @Override
-        public AtomicReference<OSGiResult<S>> addingService(
+        public AtomicReference<OSGiResult> addingService(
                ServiceReference<T> reference) {
 
-                       OSGiResultImpl<S> osgiResult = doFlatMap(reference);
+                       OSGiResultImpl osgiResult = doFlatMap(reference);
 
                        return new AtomicReference<>(osgiResult);
         }
 
-               private OSGiResultImpl<S> doFlatMap(ServiceReference<T> 
reference) {
+               private OSGiResultImpl doFlatMap(ServiceReference<T> reference) 
{
                        OSGiImpl<S> program = (OSGiImpl<S>) 
_fun.apply(reference);
 
-                       OSGiResultImpl<S> osgiResult = program._operation.run(
-                               _bundleContext);
+                       OSGiResultImpl result = 
program._operation.run(_bundleContext, _op);
 
-                       osgiResult.pipeTo(_addedSource);
+                       result.start();
 
-                       return osgiResult;
+                       return result;
                }
 
                @Override
         public void modifiedService(
                ServiceReference<T> reference,
-                       AtomicReference<OSGiResult<S>> tracked) {
+                       AtomicReference<OSGiResult> tracked) {
 
                        tracked.get().close();
 
@@ -169,7 +163,7 @@ public class ServiceReferenceOSGi<T> ext
                @Override
         public void removedService(
             ServiceReference<T> reference,
-                       AtomicReference<OSGiResult<S>> tracked) {
+                       AtomicReference<OSGiResult> tracked) {
 
             tracked.get().close();
         }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
 Tue Oct 10 15:51:22 2017
@@ -33,12 +33,12 @@ public class ServiceRegistrationOSGiImpl
        public ServiceRegistrationOSGiImpl(
                Class<T> clazz, T service, Map<String, Object> properties) {
 
-               super(bundleContext -> {
-                       ServiceRegistration<T> serviceRegistration =
+               super((bundleContext, op) -> {
+                       ServiceRegistration<?> serviceRegistration =
                                bundleContext.registerService(
                                        clazz, service, 
getProperties(properties));
 
-                       return 
getServiceRegistrationOSGiResult(serviceRegistration);
+                       return 
getServiceRegistrationOSGiResult(serviceRegistration, op);
                });
        }
 
@@ -46,25 +46,24 @@ public class ServiceRegistrationOSGiImpl
                Class<T> clazz, ServiceFactory<T> serviceFactory,
                Map<String, Object> properties) {
 
-               super(bundleContext -> {
-                       ServiceRegistration<T> serviceRegistration =
+               super((bundleContext, op) -> {
+                       ServiceRegistration<?> serviceRegistration =
                                bundleContext.registerService(
                                        clazz, serviceFactory, 
getProperties(properties));
 
-                       return 
getServiceRegistrationOSGiResult(serviceRegistration);
+                       return 
getServiceRegistrationOSGiResult(serviceRegistration, op);
                });
        }
 
        public ServiceRegistrationOSGiImpl(
                String[] clazz, Object service, Map<String, ?> properties) {
 
-               super(bundleContext -> {
+               super((bundleContext, op) -> {
                        ServiceRegistration<?> serviceRegistration =
                                bundleContext.registerService(
                                        clazz, service, new 
Hashtable<>(properties));
 
-                       return getServiceRegistrationOSGiResult(
-                               (ServiceRegistration)serviceRegistration);
+                       return 
getServiceRegistrationOSGiResult(serviceRegistration, op);
                });
        }
 
@@ -78,22 +77,15 @@ public class ServiceRegistrationOSGiImpl
                return new Hashtable<>(properties);
        }
 
-       private static <T> OSGiResultImpl<ServiceRegistration<T>>
+       private static <T> OSGiResultImpl
                getServiceRegistrationOSGiResult(
-                       ServiceRegistration<T> serviceRegistration) {
-
-               Pipe<ServiceRegistration<T>, ServiceRegistration<T>> added =
-                       Pipe.create();
-
-               Consumer<Tuple<ServiceRegistration<T>>> addedSource =
-            added.getSource();
+               ServiceRegistration<?> serviceRegistration,
+               Consumer<Tuple<ServiceRegistration<T>>> op) {
 
-               Tuple<ServiceRegistration<T>> tuple = Tuple.create(
-            serviceRegistration);
+               Tuple<ServiceRegistration<?>> tuple = 
Tuple.create(serviceRegistration);
 
-               return new OSGiResultImpl<>(
-            added,
-            () -> addedSource.accept(tuple),
+               return new OSGiResultImpl(
+            () -> ((Consumer)op).accept(tuple),
             () -> {
                 try {
                     serviceRegistration.unregister();

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Tuple.java
 Tue Oct 10 15:51:22 2017
@@ -25,7 +25,6 @@ import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -33,21 +32,23 @@ import java.util.function.Function;
  */
 class Tuple<T> implements Event<T>, SentEvent<T> {
 
-       public T t;
-       private Deque<Runnable> _closingHandlers = new LinkedList<>();
-       private DoublyLinkedList<Tuple<?>> _relatedTuples =
-               new DoublyLinkedList<>();
-       private AtomicBoolean closed = new AtomicBoolean(false);
+       public final T _t;
+       private final Deque<Runnable> _closingHandlers;
+       private final DoublyLinkedList<Tuple<?>> _relatedTuples;
+       private final AtomicBoolean closed = new AtomicBoolean(false);
        private Event<T> cause = this;
 
        private Tuple(T t) {
-               this(t, new LinkedList<>());
+               this(t, new LinkedList<>(), new DoublyLinkedList<>());
        }
 
-       private Tuple(T t, Deque<Runnable> closingHandlers) {
-               this.t = t;
+       private Tuple(
+               T t, Deque<Runnable> closingHandlers,
+               DoublyLinkedList<Tuple<?>> relatedTuples) {
 
+               _t = t;
                _closingHandlers = closingHandlers;
+               _relatedTuples = relatedTuples;
        }
 
        public void addRelatedTuple(Tuple<?> tuple) {
@@ -67,12 +68,12 @@ class Tuple<T> implements Event<T>, Sent
 
        @Override
        public T getContent() {
-               return t;
+               return _t;
        }
 
        @Override
        public int hashCode() {
-               return t.hashCode();
+               return _t.hashCode();
        }
 
        @Override
@@ -85,7 +86,7 @@ class Tuple<T> implements Event<T>, Sent
        }
 
        public <S> Tuple<S> map(Function<? super T, ? extends S> fun) {
-               return new Tuple<>(fun.apply(t), _closingHandlers);
+               return new Tuple<>(fun.apply(_t), _closingHandlers, 
_relatedTuples);
        }
 
        public void onTermination(Runnable terminator) {

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/ComponentTest.java
 Tue Oct 10 15:51:22 2017
@@ -22,6 +22,7 @@ import org.apache.aries.osgi.functional.
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
@@ -110,7 +111,7 @@ public class ComponentTest {
 
         Configuration factoryConfiguration = null;
 
-        try (OSGiResult<?> run = program.run(_bundleContext)) {
+        try (OSGiResult run = program.run(_bundleContext)) {
             factoryConfiguration = 
_configurationAdmin.createFactoryConfiguration(
                 "org.components.MyComponent");
             factoryConfiguration.update(new Hashtable<>());
@@ -222,7 +223,7 @@ public class ComponentTest {
 
         Configuration factoryConfiguration = null;
 
-        try (OSGiResult<?> run = program.run(_bundleContext)) {
+        try (OSGiResult run = program.run(_bundleContext)) {
             factoryConfiguration =
                 _configurationAdmin.createFactoryConfiguration(
                     "org.components.MyComponent");

Modified: 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java?rev=1811723&r1=1811722&r2=1811723&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 (original)
+++ 
aries/trunk/component-dsl/itests/src/main/java/org/apache/aries/osgi/functional/test/DSLTest.java
 Tue Oct 10 15:51:22 2017
@@ -70,7 +70,7 @@ public class DSLTest {
 
         assertEquals(0, atomicInteger.get());
 
-        try (OSGiResult<Integer> result = just.run(
+        try (OSGiResult result = just.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(25, atomicInteger.get());
@@ -80,7 +80,7 @@ public class DSLTest {
 
         OSGi<Integer> map = just(25).map(s -> s + 5);
 
-        try (OSGiResult<Integer> result = map.run(
+        try (OSGiResult result = map.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(30, atomicInteger.get());
@@ -90,7 +90,7 @@ public class DSLTest {
 
         OSGi<Integer> flatMap = just(25).flatMap(s -> just(s + 10));
 
-        try (OSGiResult<Integer> result = flatMap.run(
+        try (OSGiResult result = flatMap.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(35, atomicInteger.get());
@@ -100,7 +100,7 @@ public class DSLTest {
 
         OSGi<Integer> filter = just(25).filter(s -> s % 2 == 0);
 
-        try (OSGiResult<Integer> result = filter.run(
+        try (OSGiResult result = filter.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(0, atomicInteger.get());
@@ -110,7 +110,7 @@ public class DSLTest {
 
         filter = just(25).filter(s -> s % 2 != 0);
 
-        try (OSGiResult<Integer> result = filter.run(
+        try (OSGiResult result = filter.run(
             bundleContext, atomicInteger::set))
         {
             assertEquals(25, atomicInteger.get());
@@ -126,7 +126,7 @@ public class DSLTest {
         ServiceRegistration<Service> serviceRegistration = null;
 
         try(
-            OSGiResult<ServiceReference<Service>> osGiResult =
+            OSGiResult osGiResult =
                 serviceReferences(Service.class).
                 run(bundleContext, atomicReference::set)
         ) {
@@ -159,7 +159,7 @@ public class DSLTest {
         ServiceRegistration<Service> serviceRegistration = null;
 
         try(
-            OSGiResult<?> osGiResult = program.run(
+            OSGiResult osGiResult = program.run(
             bundleContext, atomicReference::set)
         ) {
             assertNull(atomicReference.get());
@@ -194,7 +194,7 @@ public class DSLTest {
 
         CountDownLatch countDownLatch = new CountDownLatch(1);
 
-        try(OSGiResult<Dictionary<String, ?>> result =
+        try(OSGiResult result =
             configuration("test.configuration").run(
                 bundleContext,
                 x -> {
@@ -239,7 +239,7 @@ public class DSLTest {
 
         Configuration configuration = null;
 
-        try(OSGiResult<Dictionary<String, ?>> result =
+        try(OSGiResult result =
             configurations("test.configuration").run(
                 bundleContext,
                 x -> {
@@ -275,7 +275,7 @@ public class DSLTest {
 
         Service service = new Service();
 
-        OSGiResult<ServiceRegistration<Service>> result = register(
+        OSGiResult result = register(
             Service.class, service, new HashMap<>()).
             run(bundleContext);
 
@@ -313,7 +313,7 @@ public class DSLTest {
                 }})
             );
 
-        OSGiResult<ServiceRegistration<Service>> result = program.run(
+        OSGiResult result = program.run(
             bundleContext);
 
         assertEquals(
@@ -454,7 +454,7 @@ public class DSLTest {
             program = services(filter).then(program);
         }
 
-        try (OSGiResult<?> result = program.run(bundleContext)) {
+        try (OSGiResult result = program.run(bundleContext)) {
             assertFalse(closed.get());
             assertFalse(executed.get());
 
@@ -509,7 +509,7 @@ public class DSLTest {
 
         assertNull(current.get());
 
-        try (OSGiResult<Void> result = program.run(bundleContext)) {
+        try (OSGiResult result = program.run(bundleContext)) {
             ServiceRegistration<Service> serviceRegistrationOne =
                 bundleContext.registerService(
                     Service.class, new Service(),


Reply via email to