Author: csierra
Date: Tue Oct 10 15:51:07 2017
New Revision: 1811722

URL: http://svn.apache.org/viewvc?rev=1811722&view=rev
Log:
[Component-DSL] Take closed into account

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/BundleOSGi.java
 Tue Oct 10 15:51:07 2017
@@ -36,7 +36,7 @@ public class BundleOSGi extends OSGiImpl
 
        public BundleOSGi(int stateMask) {
                super(bundleContext -> {
-                       Pipe<Tuple<Bundle>, Tuple<Bundle>> added = 
Pipe.create();
+                       Pipe<Bundle, Bundle> added = Pipe.create();
 
                        Consumer<Tuple<Bundle>> addedSource = added.getSource();
 
@@ -87,7 +87,7 @@ public class BundleOSGi extends OSGiImpl
                Function<? super Bundle, OSGi<? extends S>> fun) {
 
                return new OSGiImpl<>(bundleContext -> {
-                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+                       Pipe<S, S> added = Pipe.create();
 
                        Consumer<Tuple<S>> addedSource = added.getSource();
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationOSGiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -43,8 +43,8 @@ public class ConfigurationOSGiImpl
                        AtomicReference<ServiceRegistration<ManagedService>>
                                serviceRegistrationReferece = new 
AtomicReference<>(null);
 
-                       Pipe<Tuple<Dictionary<String, ?>>, 
Tuple<Dictionary<String, ?>>>
-                               added = Pipe.create();
+                       Pipe<Dictionary<String, ?>, Dictionary<String, ?>> 
added =
+                               Pipe.create();
 
                        Consumer<Tuple<Dictionary<String, ?>>> addedSource =
                                added.getSource();

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ConfigurationsOSGiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -42,7 +42,7 @@ public class ConfigurationsOSGiImpl
                        
AtomicReference<ServiceRegistration<ManagedServiceFactory>>
                                serviceRegistrationReference = new 
AtomicReference<>(null);
 
-                       Pipe<Tuple<Dictionary<String, ?>>, 
Tuple<Dictionary<String, ?>>>
+                       Pipe<Dictionary<String, ?>, Dictionary<String, ?>>
                                added = Pipe.create();
 
                        Consumer<Tuple<Dictionary<String, ?>>> addedSource =

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/DistributeOSGi.java
 Tue Oct 10 15:51:07 2017
@@ -34,7 +34,7 @@ public class DistributeOSGi<T> extends O
     @SafeVarargs
     public DistributeOSGi(OSGi<T>... programs) {
         super(bundleContext -> {
-            Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
+            Pipe<T, T> added = Pipe.create();
 
             Consumer<Tuple<T>> addedSource = added.getSource();
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/FlatMapImpl.java
 Tue Oct 10 15:51:07 2017
@@ -38,7 +38,7 @@ public class FlatMapImpl<T, S> extends O
                        AtomicReference<Runnable> closeReference =
                                new AtomicReference<>(NOOP);
 
-                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+                       Pipe<S, S> added = Pipe.create();
 
                        Consumer<Tuple<S>> addedSource = added.getSource();
 
@@ -51,7 +51,7 @@ public class FlatMapImpl<T, S> extends O
                                        closeReference.set(or1.close);
 
                                        or1.added.map(t -> {
-                                               OSGiImpl<S> program = 
(OSGiImpl<S>)fun.apply(t.t);
+                                               OSGiImpl<S> program = 
(OSGiImpl<S>)fun.apply((T)t.t);
 
                                                OSGiResultImpl<S> or2 =
                                                        
program._operation.run(bundleContext);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -45,7 +45,7 @@ public class JustOSGiImpl<T> extends OSG
        public JustOSGiImpl(Supplier<Collection<T>> t) {
                super(((bundleContext) -> {
 
-                       Pipe<Tuple<T>, Tuple<T>> added = Pipe.create();
+                       Pipe<T, T> added = Pipe.create();
 
                        AtomicReference<Collection<Tuple<T>>> 
collectionAtomicReference =
                                new AtomicReference<>();
@@ -76,7 +76,7 @@ public class JustOSGiImpl<T> extends OSG
        @Override
        public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
                return new OSGiImpl<>(bundleContext -> {
-                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+                       Pipe<S, S> added = Pipe.create();
 
                        AtomicReference<Runnable> atomicReference = new 
AtomicReference<>(
                                NOOP);

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -216,7 +216,7 @@ public class OSGiImpl<T> implements OSGi
                                DoublyLinkedList<Tuple<Function<T, S>>> funs =
                                        new DoublyLinkedList<>();
 
-                               Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+                               Pipe<S, S> added = Pipe.create();
 
                                Consumer<Tuple<S>> addedSource = 
added.getSource();
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiResultImpl.java
 Tue Oct 10 15:51:07 2017
@@ -26,12 +26,12 @@ import java.util.function.Consumer;
  */
 public class OSGiResultImpl<T> implements OSGiResult<T> {
 
-       public Pipe<?, Tuple<T>> added;
+       public Pipe<?, T> added;
        public Runnable start;
        public Runnable close;
 
        public OSGiResultImpl(
-               Pipe<?, Tuple<T>> added, Runnable start, Runnable close) {
+               Pipe<?, T> added, Runnable start, Runnable close) {
 
                this.added = added;
                this.start = start;

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -24,7 +24,7 @@ public class OnCloseOSGiImpl extends OSG
 
        public OnCloseOSGiImpl(Runnable action) {
                super(bundleContext -> {
-                       Pipe<Tuple<Void>, Tuple<Void>> added = Pipe.create();
+                       Pipe<Void, Void> added = Pipe.create();
 
                        Tuple<Void> tuple = Tuple.create(null);
 

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/Pipe.java
 Tue Oct 10 15:51:07 2017
@@ -25,9 +25,9 @@ import java.util.function.Function;
  */
 class Pipe<I, O> {
 
-       private Function<I, O> pipe;
+       private Function<Tuple<I>, Tuple<O>> pipe;
 
-       private Pipe(Function<I, O> fun) {
+       private Pipe(Function<Tuple<I>, Tuple<O>> fun) {
                this.pipe = fun;
        }
 
@@ -35,11 +35,21 @@ class Pipe<I, O> {
                return new Pipe<>(x -> x);
        }
 
-       public Consumer<I> getSource() {
-               return i -> pipe.apply(i);
+       public Consumer<Tuple<I>> getSource() {
+               return i -> {
+                       if (i.isClosed()) {
+                               return;
+                       }
+
+                       pipe.apply(i);
+
+                       if (i.isClosed()) {
+                               i.terminate();
+                       }
+               };
        }
 
-       <U> Pipe<I, U> map(Function<? super O, ? extends U> fun) {
+       <U> Pipe<I, U> map(Function<Tuple<O>, Tuple<U>> fun) {
                this.pipe = (Function)this.pipe.andThen(fun);
 
                return (Pipe<I, U>)this;

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/RouteOsgiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -29,7 +29,7 @@ public class RouteOsgiImpl<T> extends OS
 
         super(((bundleContext) -> {
 
-            Pipe<Tuple<T>, Tuple<T>> outgoingAddingPipe = Pipe.create();
+            Pipe<T, T> outgoingAddingPipe = Pipe.create();
 
             Consumer<Tuple<T>> outgoingAddingSource =
                 outgoingAddingPipe.getSource();

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceReferenceOSGi.java
 Tue Oct 10 15:51:07 2017
@@ -38,7 +38,7 @@ public class ServiceReferenceOSGi<T> ext
 
        public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
                super(bundleContext -> {
-                       Pipe<Tuple<ServiceReference<T>>, 
Tuple<ServiceReference<T>>>
+                       Pipe<ServiceReference<T>, ServiceReference<T>>
                                added = Pipe.create();
 
                        ServiceTracker<T, 
AtomicReference<Tuple<ServiceReference<T>>>>
@@ -60,7 +60,7 @@ public class ServiceReferenceOSGi<T> ext
                Function<? super ServiceReference<T>, OSGi<? extends S>> fun) {
 
                return new OSGiImpl<>(bundleContext -> {
-                       Pipe<Tuple<S>, Tuple<S>> added = Pipe.create();
+                       Pipe<S, S> added = Pipe.create();
 
                        ServiceTracker<T, ?> serviceTracker =
                                new ServiceTracker<>(

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java?rev=1811722&r1=1811721&r2=1811722&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
 Tue Oct 10 15:51:07 2017
@@ -82,8 +82,8 @@ public class ServiceRegistrationOSGiImpl
                getServiceRegistrationOSGiResult(
                        ServiceRegistration<T> serviceRegistration) {
 
-               Pipe<Tuple<ServiceRegistration<T>>, 
Tuple<ServiceRegistration<T>>>
-            added = Pipe.create();
+               Pipe<ServiceRegistration<T>, ServiceRegistration<T>> added =
+                       Pipe.create();
 
                Consumer<Tuple<ServiceRegistration<T>>> addedSource =
             added.getSource();


Reply via email to