Author: csierra
Date: Wed Aug  2 15:55:05 2017
New Revision: 1803838

URL: http://svn.apache.org/viewvc?rev=1803838&view=rev
Log:
Fix generic flatMap

it was not propagating removed instances

Modified:
    
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java

Modified: 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java?rev=1803838&r1=1803837&r2=1803838&view=diff
==============================================================================
--- 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 (original)
+++ 
aries/trunk/component-dsl/component-dsl/src/main/java/org/apache/aries/osgi/functional/internal/OSGiImpl.java
 Wed Aug  2 15:55:05 2017
@@ -52,7 +52,7 @@ public class OSGiImpl<T> implements OSGi
        public <S> OSGiImpl<S> flatMap(Function<? super T, OSGi<? extends S>> 
fun) {
                return new OSGiImpl<>(
                        ((bundleContext) -> {
-                               Map<Object, OSGiResult<? extends S>> identities 
=
+                               Map<Object, OSGiResult<?>> identities =
                                        new IdentityHashMap<>();
 
                                AtomicReference<Runnable> closeReference =
@@ -66,61 +66,52 @@ public class OSGiImpl<T> implements OSGi
 
                                Consumer<Tuple<S>> removedSource = 
removed.getSource();
 
-                               AtomicReference<Tuple<S>> tupleAtomicReference =
-                                       new AtomicReference<>();
-
-                               OSGiResultImpl<S> osgiResult = new 
OSGiResultImpl<>(
-                                       added, removed, null,
+                               return new OSGiResultImpl<>(
+                                       added, removed,
                                        () -> {
-                                               synchronized (identities) {
-                                                       
identities.values().forEach(OSGiResult::close);
-                                               }
+                                               OSGiResultImpl<T> or1 = 
_operation.run(bundleContext);
 
-                                               closeReference.get().run();
-                                       });
-
-                               osgiResult.start = () -> {
-                                       OSGiResultImpl<T> or1 = 
_operation.run(bundleContext);
+                                               closeReference.set(or1.close);
 
-                                       closeReference.set(or1.close);
+                                               or1.added.map(t -> {
+                                                       OSGiImpl<S> program =
+                                                               
(OSGiImpl<S>)fun.apply(t.t);
 
-                                       or1.added.map(t -> {
-                                               OSGi<? extends S> program = 
fun.apply(t.t);
+                                                       OSGiResultImpl<S> or2 =
+                                                               
program._operation.run(bundleContext);
 
-                                               OSGiResult<? extends S> or2 = 
program.run(
-                                                       bundleContext,
-                                                       s -> {
-                                                               Tuple<S> tuple 
= Tuple.create(s);
+                                                       or2.added.map(s -> 
{addedSource.accept(s); return null;});
+                                                       or2.removed.map(s -> 
{removedSource.accept(s); return null;});
 
-                                                               
tupleAtomicReference.set(tuple);
+                                                       or2.start.run();
 
-                                                               
addedSource.accept(tuple);
-                                                       });
+                                                       
identities.put(t.original, or2);
 
-                                               identities.put(t.original, or2);
+                                                       return null;
+                                               });
 
-                                               return null;
-                                       });
+                                               or1.removed.map(t -> {
+                                                       synchronized 
(identities) {
+                                                               OSGiResult<?> 
osgiResult1 =
+                                                                       
identities.remove(t.original);
 
-                                       or1.removed.map(t -> {
-                                               synchronized (identities) {
-                                                       OSGiResult<? extends S> 
osgiResult1 =
-                                                               
identities.remove(t.original);
+                                                               if (osgiResult1 
!= null) {
+                                                                       
osgiResult1.close();
+                                                               }
+                                                       }
 
-                                                       
removedSource.accept(tupleAtomicReference.get());
+                                                       return null;
+                                               });
 
-                                                       if (osgiResult1 != 
null) {
-                                                               
osgiResult1.close();
-                                                       }
+                                               or1.start.run();
+                                       },
+                                       () -> {
+                                               synchronized (identities) {
+                                                       
identities.values().forEach(OSGiResult::close);
                                                }
 
-                                               return null;
+                                               closeReference.get().run();
                                        });
-
-                                       or1.start.run();
-                               };
-
-                               return osgiResult;
                        }
                        ));
        }


Reply via email to