Author: csierra
Date: Wed May 24 21:43:58 2017
New Revision: 1796109

URL: http://svn.apache.org/viewvc?rev=1796109&view=rev
Log:
Symetrically signal object deregistration

before closing so foreach and other usages work as expected.

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java?rev=1796109&r1=1796108&r2=1796109&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/JustOSGiImpl.java
 Wed May 24 21:43:58 2017
@@ -39,9 +39,16 @@ public class JustOSGiImpl<T> extends OSG
 
                        Consumer<Tuple<T>> source = added.getSource();
 
+                       Pipe<Tuple<T>, Tuple<T>> removed = Pipe.create();
+
+                       Consumer<Tuple<T>> removedSource = removed.getSource();
+
+                       Tuple<T> tuple = Tuple.create(t);
+
                        return new OSGiResultImpl<>(
-                               added, Pipe.create(),
-                               () -> source.accept(Tuple.create(t)), 
OSGi.NOOP);
+                               added, removed,
+                               () -> source.accept(tuple),
+                               () -> removedSource.accept(tuple));
                }));
 
                _t = t;
@@ -54,21 +61,37 @@ public class JustOSGiImpl<T> extends OSG
 
                        Consumer<Tuple<S>> addedSource = added.getSource();
 
+                       Pipe<Tuple<S>, Tuple<S>> removed = Pipe.create();
+
+                       Consumer<Tuple<S>> removedSource = removed.getSource();
+
                        AtomicReference<OSGiResult<? extends S>> 
atomicReference =
                                new AtomicReference<>(null);
+                       AtomicReference<Tuple<S>> tupleReference =
+                               new AtomicReference<>();
 
                        return new OSGiResultImpl<>(
-                               added, Pipe.create(),
+                               added, removed,
                                () -> {
                                        OSGi<? extends S> next = fun.apply(_t);
 
                                        atomicReference.set(
                                                next.run(
                                                        bundleContext,
-                                                       s -> 
addedSource.accept(Tuple.create(s))));
+                                                       s -> {
+                                                               Tuple<S> tuple 
= Tuple.create(s);
+
+                                                               
tupleReference.set(tuple);
+
+                                                               
addedSource.accept(tuple);
+                                                       }));
 
                                },
-                               () -> atomicReference.get().close());
+                               () -> {
+                                       
removedSource.accept(tupleReference.get());
+
+                                       atomicReference.get().close();
+                               });
                });
        }
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1796109&r1=1796108&r2=1796109&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Wed May 24 21:43:58 2017
@@ -60,8 +60,15 @@ public class OSGiImpl<T> implements OSGi
 
                                Consumer<Tuple<S>> addedSource = 
added.getSource();
 
+                               Pipe<Tuple<S>, Tuple<S>> removed = 
Pipe.create();
+
+                               Consumer<Tuple<S>> removedSource = 
removed.getSource();
+
+                               AtomicReference<Tuple<S>> tupleAtomicReference =
+                                       new AtomicReference<>();
+
                                OSGiResultImpl<S> osgiResult = new 
OSGiResultImpl<>(
-                                       added, Pipe.create(), null,
+                                       added, removed, null,
                                        () -> {
                                                synchronized (identities) {
                                                        
identities.values().forEach(OSGiResult::close);
@@ -80,7 +87,13 @@ public class OSGiImpl<T> implements OSGi
 
                                                OSGiResult<? extends S> or2 = 
program.run(
                                                        bundleContext,
-                                                       s -> 
addedSource.accept(Tuple.create(s)));
+                                                       s -> {
+                                                               Tuple<S> tuple 
= Tuple.create(s);
+
+                                                               
tupleAtomicReference.set(tuple);
+
+                                                               
addedSource.accept(tuple);
+                                                       });
 
                                                identities.put(t.original, or2);
 
@@ -92,6 +105,8 @@ public class OSGiImpl<T> implements OSGi
                                                        OSGiResult<? extends S> 
osgiResult1 =
                                                                
identities.remove(t.original);
 
+                                                       
removedSource.accept(tupleAtomicReference.get());
+
                                                        if (osgiResult1 != 
null) {
                                                                
osgiResult1.close();
                                                        }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java?rev=1796109&r1=1796108&r2=1796109&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OnCloseOSGiImpl.java
 Wed May 24 21:43:58 2017
@@ -24,12 +24,20 @@ public class OnCloseOSGiImpl extends OSG
 
        public OnCloseOSGiImpl(Runnable action) {
                super(bundleContext -> {
-                       Pipe<Tuple<Void>, Tuple<Void>> pipe = Pipe.create();
+                       Pipe<Tuple<Void>, Tuple<Void>> added = Pipe.create();
+
+                       Pipe<Tuple<Void>, Tuple<Void>> removed = Pipe.create();
+
+                       Tuple<Void> tuple = Tuple.create(null);
 
                        return new OSGiResultImpl<>(
-                               pipe, Pipe.create(),
-                               () -> 
pipe.getSource().accept(Tuple.create(null)),
-                               action::run);
+                               added, removed,
+                               () -> added.getSource().accept(tuple),
+                               () -> {
+                                       action.run();
+                                       removed.getSource().accept(tuple);
+                               }
+                       );
                });
        }
 }

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java?rev=1796109&r1=1796108&r2=1796109&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/ServiceRegistrationOSGiImpl.java
 Wed May 24 21:43:58 2017
@@ -47,8 +47,14 @@ public class ServiceRegistrationOSGiImpl
                        Tuple<ServiceRegistration<T>> tuple = Tuple.create(
                                serviceRegistration);
 
+                       Pipe<Tuple<ServiceRegistration<T>>, 
Tuple<ServiceRegistration<T>>>
+                               removed = Pipe.create();
+
+                       Consumer<Tuple<ServiceRegistration<T>>> removedSource =
+                               removed.getSource();
+
                        return new OSGiResultImpl<>(
-                               added, Pipe.create(),
+                               added, removed,
                                () -> addedSource.accept(tuple),
                                () -> {
                                        try {
@@ -56,6 +62,9 @@ public class ServiceRegistrationOSGiImpl
                                        }
                                        catch (Exception e) {
                                        }
+                                       finally {
+                                               removedSource.accept(tuple);
+                                       }
                                });
                });
        }


Reply via email to